Completed
Push — try/aggressive-sync ( a30f77 )
by
unknown
42:57 queued 34:42
created

Abstract_Jetpack_Sync_Queue   A

Complexity

Total Complexity 5

Size/Duplication

Total Lines 54
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 1

Importance

Changes 0
Metric Value
dl 0
loc 54
c 0
b 0
f 0
rs 10
wmc 5
lcom 1
cbo 1

18 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 3 1
add() 0 1 ?
add_all() 0 1 ?
lag() 0 1 ?
reset() 0 1 ?
size() 0 1 ?
has_any_items() 0 1 ?
checkout() 0 1 ?
checkout_with_memory_limit() 0 1 ?
checkin() 0 1 ?
close() 0 1 ?
A flush_all() 0 6 1
A get_all() 0 3 1
A peek() 0 8 2
fetch_items() 0 1 ?
force_checkin() 0 1 ?
lock() 0 1 ?
unlock() 0 1 ?
1
<?php
2
3
/**
4
 * A buffer of items from the queue that can be checked out
5
 */
6
class Jetpack_Sync_Queue_Buffer {
7
	public $id;
8
	public $items_with_ids;
9
10
	public function __construct( $id, $items_with_ids ) {
11
		$this->id             = $id;
12
		$this->items_with_ids = $items_with_ids;
13
	}
14
15
	public function get_items() {
16
		return array_combine( $this->get_item_ids(), $this->get_item_values() );
17
	}
18
19
	public function get_item_values() {
20
		return Jetpack_Sync_Utils::get_item_values( $this->items_with_ids );
21
	}
22
23
	public function get_item_ids() {
24
		return Jetpack_Sync_Utils::get_item_ids( $this->items_with_ids );
25
	}
26
}
27
28
/**
29
 * Base class for queue implementations
30
 */
31
abstract class Abstract_Jetpack_Sync_Queue {
32
	public $id;
33
	function __construct( $id ) {
34
		$this->id           = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
35
	}
36
37
	abstract function add( $item );
38
	abstract function add_all( $items );
39
	abstract function lag( $now = null );
40
	abstract function reset();
41
	abstract function size();
42
	abstract function has_any_items();
43
	abstract function checkout( $buffer_size );
44
45
	// this checks out rows until it either empties the queue or hits a certain memory limit
46
	// it loads the sizes from the DB first so that it doesn't accidentally
47
	// load more data into memory than it needs to.
48
	// The only way it will load more items than $max_size is if a single queue item
49
	// exceeds the memory limit, but in that case it will send that item by itself.
50
	abstract function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 );
51
	abstract function checkin( $buffer );
52
	abstract function close( $buffer, $ids_to_remove = null );
53
	
54
	function flush_all() {
55
		$items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() );
56
		$this->reset();
57
58
		return $items;
59
	}
60
61
	function get_all() {
62
		return $this->fetch_items();
63
	}
64
65
	function peek( $count = 1 ) {
66
		$items = $this->fetch_items( $count );
67
		if ( $items ) {
68
			return Jetpack_Sync_Utils::get_item_values( $items );
69
		}
70
71
		return array();
72
	}
73
74
	abstract protected function fetch_items( $limit = null );
75
76
	// use with caution, this could allow multiple processes to delete
77
	// and send from the queue at the same time
78
	abstract function force_checkin();
79
80
	// used to lock checkouts from the queue.
81
	// tries to wait up to $timeout seconds for the queue to be empty
82
	abstract function lock( $timeout = 30 );
83
	abstract function unlock();
84
}
85
86
/**
87
 * An in-memory version of the sync queue. Transiently buffers and attempts to send entire queue
88
 * at end of request. If there's an error sending, copies all items to the regular sync queue.
89
 */
90
class Jetpack_Memory_Sync_Queue extends Abstract_Jetpack_Sync_Queue {
91
	private $items;
92
	private $row_iterator;
93
94
	function __construct( $id ) {
95
		parent::__construct( $id );
96
		$this->reset();
97
		$this->row_iterator = 0;
98
		// $this->random_int   = mt_rand( 1, 1000000 );
99
	}
100
101
	function add( $item ) {
102
		$this->items[] = (object) array(
103
			'id' => $this->row_iterator,
104
			'value' => $item
105
		);
106
		$this->row_iterator += 1;
107
	}
108
109
	function add_all( $items ) {
110
		foreach ( $items as $item ) {
111
			$this->add( $item );
112
		}
113
	}
114
115
	function lag( $now = NULL ) {
116
		return 0;
117
	}
118
119
	function reset() {
120
		$this->items = array();
121
	}
122
123
	function size() {
124
		return count( $this->items );
125
	}
126
127
	function has_any_items() {
128
		return $this->size() > 0;
129
	}
130
131
	function checkout( $buffer_size ) {
132
		// best behaviour here? Always return all items?
133
		if ( ! $this->has_any_items() ) {
134
			return false;
135
		}
136
137
		$buffer_id = 1; // since it's always in memory, no need to avoid conflicts
138
139
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $this->fetch_items( $buffer_size ) );
140
141
		return $buffer;
142
	}
143
144
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
145
		// just give it everything
146
		return new Jetpack_Sync_Queue_Buffer( 1, $this->fetch_items() );
147
	}
148
	
149
	function checkin( $buffer ) {
150
		// do nothing
151
	}
152
153
	function close( $buffer, $ids_to_remove = null ) {
154
		// by default clear all items in the buffer
155
		if ( is_null( $ids_to_remove ) ) {
156
			$ids_to_remove = $buffer->get_item_ids();
157
		}
158
159
		global $wpdb;
160
161
		$this->items = array_filter( $this->items, function ( $item ) use ( &$ids_to_remove ) { 
162
			return ! in_array( $item->id, $ids_to_remove );
163
		} );
164
	}
165
166
	// use with caution, this could allow multiple processes to delete
167
	// and send from the queue at the same time
168
	function force_checkin() {
169
		// noop
170
	}
171
172
	// used to lock checkouts from the queue.
173
	// tries to wait up to $timeout seconds for the queue to be empty
174
	function lock( $timeout = 30 ) {
175
		// noop
176
	}
177
	
178
	function unlock() {
179
		// noop
180
	}
181
182
	function fetch_items( $limit = null ) {
183
		return array_slice( $this->items, 0, $limit );
184
	}
185
}
186
187
/**
188
 * A persistent queue that can be flushed in increments of N items,
189
 * and which blocks reads until checked-out buffers are checked in or
190
 * closed. This uses raw SQL for two reasons: speed, and not triggering
191
 * tons of added_option callbacks.
192
 */
193
class Jetpack_Sync_Queue extends Abstract_Jetpack_Sync_Queue {
194
	private $row_iterator;
195
196
	function __construct( $id ) {
197
		parent::__construct( $id );
198
		$this->row_iterator = 0;
199
		$this->random_int   = mt_rand( 1, 1000000 );
0 ignored issues
show
Bug introduced by
The property random_int does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
200
	}
201
202
	function add( $item ) {
203
		global $wpdb;
204
		$added = false;
205
		// this basically tries to add the option until enough time has elapsed that
206
		// it has a unique (microtime-based) option key
207
		while ( ! $added ) {
208
			$rows_added = $wpdb->query(
209
				$wpdb->prepare(
210
					"INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES (%s, %s,%s)",
211
					$this->get_next_data_row_option_name(),
212
					serialize( $item ),
213
					'no'
214
				)
215
			);
216
			$added      = ( 0 !== $rows_added );
217
		}
218
	}
219
220
	// Attempts to insert all the items in a single SQL query. May be subject to query size limits!
221
	function add_all( $items ) {
222
		global $wpdb;
223
		$base_option_name = $this->get_next_data_row_option_name();
224
225
		$query = "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES ";
226
227
		$rows = array();
228
229
		for ( $i = 0; $i < count( $items ); $i += 1 ) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
230
			$option_name  = esc_sql( $base_option_name . '-' . $i );
231
			$option_value = esc_sql( serialize( $items[ $i ] ) );
232
			$rows[]       = "('$option_name', '$option_value', 'no')";
233
		}
234
235
		$rows_added = $wpdb->query( $query . join( ',', $rows ) );
236
237
		if ( count( $items ) === $rows_added ) {
238
			return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
239
		}
240
	}
241
242
	// Peek at the front-most item on the queue without checking it out
243
	function peek( $count = 1 ) {
244
		$items = $this->fetch_items( $count );
245
		if ( $items ) {
246
			return Jetpack_Sync_Utils::get_item_values( $items );
247
		}
248
249
		return array();
250
	}
251
252
	// lag is the difference in time between the age of the oldest item
253
	// (aka first or frontmost item) and the current time
254
	function lag( $now = null ) {
255
		global $wpdb;
256
257
		$first_item_name = $wpdb->get_var(
258
			$wpdb->prepare(
259
				"SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
260
				"jpsq_{$this->id}-%"
261
			)
262
		);
263
264
		if ( ! $first_item_name ) {
265
			return 0;
266
		}
267
268
		if ( null === $now ) {
269
			$now = microtime( true );
270
		}
271
272
		// break apart the item name to get the timestamp
273
		$matches = null;
274
		if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $first_item_name, $matches ) ) {
275
			return $now - floatval( $matches[1] );
276
		} else {
277
			return 0;
278
		}
279
	}
280
281
	function reset() {
282
		global $wpdb;
283
		$this->delete_checkout_id();
284
		$wpdb->query(
285
			$wpdb->prepare(
286
				"DELETE FROM $wpdb->options WHERE option_name LIKE %s",
287
				"jpsq_{$this->id}-%"
288
			)
289
		);
290
	}
291
292
	function size() {
293
		global $wpdb;
294
295
		return (int) $wpdb->get_var(
296
			$wpdb->prepare(
297
				"SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s",
298
				"jpsq_{$this->id}-%"
299
			)
300
		);
301
	}
302
303
	// we use this peculiar implementation because it's much faster than count(*)
304
	function has_any_items() {
305
		global $wpdb;
306
		$value = $wpdb->get_var(
307
			$wpdb->prepare(
308
				"SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )",
309
				"jpsq_{$this->id}-%"
310
			)
311
		);
312
313
		return ( $value === '1' );
314
	}
315
316
	function checkout( $buffer_size ) {
317
		if ( $this->get_checkout_id() ) {
318
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
319
		}
320
321
		$buffer_id = uniqid();
322
323
		$result = $this->set_checkout_id( $buffer_id );
324
325
		if ( ! $result || is_wp_error( $result ) ) {
326
			return $result;
327
		}
328
329
		$items = $this->fetch_items( $buffer_size );
330
331
		if ( count( $items ) === 0 ) {
332
			return false;
333
		}
334
335
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
336
337
		return $buffer;
338
	}
339
340
	// this checks out rows until it either empties the queue or hits a certain memory limit
341
	// it loads the sizes from the DB first so that it doesn't accidentally
342
	// load more data into memory than it needs to.
343
	// The only way it will load more items than $max_size is if a single queue item
344
	// exceeds the memory limit, but in that case it will send that item by itself.
345
	function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
346
		if ( $this->get_checkout_id() ) {
347
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
348
		}
349
350
		$buffer_id = uniqid();
351
352
		$result = $this->set_checkout_id( $buffer_id );
353
354
		if ( ! $result || is_wp_error( $result ) ) {
355
			return $result;
356
		}
357
358
		// get the map of buffer_id -> memory_size
359
		global $wpdb;
360
361
		$items_with_size = $wpdb->get_results(
362
			$wpdb->prepare(
363
				"SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",
364
				"jpsq_{$this->id}-%",
365
				$max_buffer_size
366
			),
367
			OBJECT
368
		);
369
370
		if ( count( $items_with_size ) === 0 ) {
371
			return false;
372
		}
373
374
		$total_memory = 0;
375
376
		$min_item_id = $max_item_id = $items_with_size[0]->id;
377
378
		foreach ( $items_with_size as $id => $item_with_size ) {
379
			$total_memory += $item_with_size->value_size;
380
381
			// if this is the first item and it exceeds memory, allow loop to continue
382
			// we will exit on the next iteration instead
383
			if ( $total_memory > $max_memory && $id > 0 ) {
384
				break;
385
			}
386
387
			$max_item_id = $item_with_size->id;
388
		}
389
390
		$query = $wpdb->prepare(
391
			"SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name >= %s and option_name <= %s ORDER BY option_name ASC",
392
			$min_item_id,
393
			$max_item_id
394
		);
395
396
		$items = $wpdb->get_results( $query, OBJECT );
397
		foreach ( $items as $item ) {
398
			$item->value = maybe_unserialize( $item->value );
399
		}
400
401
		if ( count( $items ) === 0 ) {
402
			$this->delete_checkout_id();
403
404
			return false;
405
		}
406
407
		$buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $items );
408
409
		return $buffer;
410
	}
411
412
	function checkin( $buffer ) {
413
		$is_valid = $this->validate_checkout( $buffer );
414
415
		if ( is_wp_error( $is_valid ) ) {
416
			return $is_valid;
417
		}
418
419
		$this->delete_checkout_id();
420
421
		return true;
422
	}
423
424
	function close( $buffer, $ids_to_remove = null ) {
425
		$is_valid = $this->validate_checkout( $buffer );
426
427
		if ( is_wp_error( $is_valid ) ) {
428
			return $is_valid;
429
		}
430
431
		$this->delete_checkout_id();
432
433
		// by default clear all items in the buffer
434
		if ( is_null( $ids_to_remove ) ) {
435
			$ids_to_remove = $buffer->get_item_ids();
436
		}
437
438
		global $wpdb;
439
440
		if ( count( $ids_to_remove ) > 0 ) {
441
			$sql   = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
442
			$query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
443
			$wpdb->query( $query );
444
		}
445
446
		return true;
447
	}
448
449
	// use with caution, this could allow multiple processes to delete
450
	// and send from the queue at the same time
451
	function force_checkin() {
452
		$this->delete_checkout_id();
453
	}
454
455
	// used to lock checkouts from the queue.
456
	// tries to wait up to $timeout seconds for the queue to be empty
457
	function lock( $timeout = 30 ) {
458
		$tries = 0;
459
460
		while ( $this->has_any_items() && $tries < $timeout ) {
461
			sleep( 1 );
462
			$tries += 1;
463
		}
464
465
		if ( $tries === 30 ) {
466
			return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
467
		}
468
469
		if ( $this->get_checkout_id() ) {
470
			return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
471
		}
472
473
		// hopefully this means we can acquire a checkout?
474
		$result = $this->set_checkout_id( 'lock' );
475
476
		if ( ! $result || is_wp_error( $result ) ) {
477
			return $result;
478
		}
479
480
		return true;
481
	}
482
483
	function unlock() {
484
		return $this->delete_checkout_id();
485
	}
486
487
	/**
488
	 * This option is specifically chosen to, as much as possible, preserve time order
489
	 * and minimise the possibility of collisions between multiple processes working
490
	 * at the same time.
491
	 *
492
	 * @return string
493
	 */
494
	protected function generate_option_name_timestamp() {
495
		return sprintf( '%.6f', microtime( true ) );
496
	}
497
498
	private function get_checkout_id() {
499
		global $wpdb;
500
		$checkout_value = $wpdb->get_var(
501
			$wpdb->prepare(
502
				"SELECT option_value FROM $wpdb->options WHERE option_name = %s",
503
				$this->get_lock_option_name()
504
			)
505
		);
506
507
		if ( $checkout_value ) {
508
			list( $checkout_id, $timestamp ) = explode( ':', $checkout_value );
509
			if ( intval( $timestamp ) > time() ) {
510
				return $checkout_id;
511
			}
512
		}
513
514
		return false;
515
	}
516
517
	private function set_checkout_id( $checkout_id ) {
518
		global $wpdb;
519
520
		$expires     = time() + Jetpack_Sync_Defaults::$default_sync_queue_lock_timeout;
0 ignored issues
show
Bug introduced by
The property default_sync_queue_lock_timeout cannot be accessed from this context as it is declared private in class Jetpack_Sync_Defaults.

This check looks for access to properties that are not accessible from the current context.

If you need to make a property accessible to another context you can either raise its visibility level or provide an accessible getter in the defining class.

Loading history...
521
		$updated_num = $wpdb->query(
522
			$wpdb->prepare(
523
				"UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s",
524
				"$checkout_id:$expires",
525
				$this->get_lock_option_name()
526
			)
527
		);
528
529
		if ( ! $updated_num ) {
530
			$updated_num = $wpdb->query(
531
				$wpdb->prepare(
532
					"INSERT INTO $wpdb->options ( option_name, option_value, autoload ) VALUES ( %s, %s, 'no' )",
533
					$this->get_lock_option_name(),
534
					"$checkout_id:$expires"
535
				)
536
			);
537
		}
538
539
		return $updated_num;
540
	}
541
542
	private function delete_checkout_id() {
543
		global $wpdb;
544
		// rather than delete, which causes fragmentation, we update in place
545
		return $wpdb->query(
546
			$wpdb->prepare(
547
				"UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s",
548
				'0:0',
549
				$this->get_lock_option_name()
550
			)
551
		);
552
553
	}
554
555
	private function get_lock_option_name() {
556
		return "jpsq_{$this->id}_checkout";
557
	}
558
559
	private function get_next_data_row_option_name() {
560
		$timestamp = $this->generate_option_name_timestamp();
561
562
		// row iterator is used to avoid collisions where we're writing data waaay fast in a single process
563
		if ( $this->row_iterator === PHP_INT_MAX ) {
564
			$this->row_iterator = 0;
565
		} else {
566
			$this->row_iterator += 1;
567
		}
568
569
		return 'jpsq_' . $this->id . '-' . $timestamp . '-' . $this->random_int . '-' . $this->row_iterator;
570
	}
571
572
	protected function fetch_items( $limit = null ) {
573
		global $wpdb;
574
575
		if ( $limit ) {
576
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
577
		} else {
578
			$query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
579
		}
580
581
		$items = $wpdb->get_results( $query_sql, OBJECT );
582
		foreach ( $items as $item ) {
583
			$item->value = maybe_unserialize( $item->value );
584
		}
585
586
		return $items;
587
	}
588
589
	private function validate_checkout( $buffer ) {
590
		if ( ! $buffer instanceof Jetpack_Sync_Queue_Buffer ) {
591
			return new WP_Error( 'not_a_buffer', 'You must checkin an instance of Jetpack_Sync_Queue_Buffer' );
592
		}
593
594
		$checkout_id = $this->get_checkout_id();
595
596
		if ( ! $checkout_id ) {
597
			return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
598
		}
599
600
		if ( $checkout_id != $buffer->id ) {
601
			return new WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' );
602
		}
603
604
		return true;
605
	}
606
}
607
608
class Jetpack_Sync_Utils {
609
610
	static function get_item_values( $items ) {
611
		return array_map( array( __CLASS__, 'get_item_value' ), $items );
612
	}
613
614
	static function get_item_ids( $items ) {
615
		return array_map( array( __CLASS__, 'get_item_id' ), $items );
616
	}
617
618
	private static function get_item_value( $item ) {
619
		return $item->value;
620
	}
621
622
	private static function get_item_id( $item ) {
623
		return $item->id;
624
	}
625
}
626