Completed
Push — master-stable ( 53f101...a82972 )
by
unknown
86:26 queued 76:28
created

sync/class.jetpack-sync-sender.php (1 issue)

Labels
Severity

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
require_once dirname( __FILE__ ) . '/class.jetpack-sync-queue.php';
4
require_once dirname( __FILE__ ) . '/class.jetpack-sync-defaults.php';
5
require_once dirname( __FILE__ ) . '/class.jetpack-sync-json-deflate-array-codec.php';
6
require_once dirname( __FILE__ ) . '/class.jetpack-sync-modules.php';
7
require_once dirname( __FILE__ ) . '/class.jetpack-sync-settings.php';
8
9
/**
10
 * This class grabs pending actions from the queue and sends them
11
 */
12
class Jetpack_Sync_Sender {
13
14
	const NEXT_SYNC_TIME_OPTION_NAME = 'jetpack_next_sync_time';
15
	const WPCOM_ERROR_SYNC_DELAY = 60;
16
	const QUEUE_LOCKED_SYNC_DELAY = 10;
17
18
	private $dequeue_max_bytes;
19
	private $upload_max_bytes;
20
	private $upload_max_rows;
21
	private $max_dequeue_time;
22
	private $sync_wait_time;
23
	private $sync_wait_threshold;
24
	private $enqueue_wait_time;
25
	private $sync_queue;
26
	private $full_sync_queue;
27
	private $codec;
28
29
	// singleton functions
30
	private static $instance;
31
32
	public static function get_instance() {
33
		if ( null === self::$instance ) {
34
			self::$instance = new self();
35
		}
36
37
		return self::$instance;
38
	}
39
40
	// this is necessary because you can't use "new" when you declare instance properties >:(
41
	protected function __construct() {
42
		$this->set_defaults();
43
		$this->init();
44
	}
45
46
	private function init() {
47
		foreach ( Jetpack_Sync_Modules::get_modules() as $module ) {
48
			$module->init_before_send();
49
		}
50
	}
51
52
	public function get_next_sync_time( $queue_name ) {
53
		return (double) get_option( self::NEXT_SYNC_TIME_OPTION_NAME . '_' . $queue_name, 0 );
54
	}
55
56
	public function set_next_sync_time( $time, $queue_name ) {
57
		return update_option( self::NEXT_SYNC_TIME_OPTION_NAME . '_' . $queue_name, $time, true );
58
	}
59
60
	public function do_full_sync() {
61
		$this->continue_full_sync_enqueue();
62
		return $this->do_sync_and_set_delays( $this->full_sync_queue );
63
	}
64
65
	private function continue_full_sync_enqueue() {
66
		if ( defined( 'WP_IMPORTING' ) && WP_IMPORTING ) {
67
			return false;
68
		}
69
70
		if ( $this->get_next_sync_time( 'full-sync-enqueue' ) > microtime( true ) ) {
71
			return false;
72
		}
73
74
		Jetpack_Sync_Modules::get_module( 'full-sync' )->continue_enqueuing();
75
76
		$this->set_next_sync_time( time() + $this->get_enqueue_wait_time(), 'full-sync-enqueue' );
77
	}
78
79
	public function do_sync() {
80
		return $this->do_sync_and_set_delays( $this->sync_queue );
81
	}
82
83
	public function do_sync_and_set_delays( $queue ) {
84
		// don't sync if importing
85
		if ( defined( 'WP_IMPORTING' ) && WP_IMPORTING ) {
86
			return new WP_Error( 'is_importing' );
87
		}
88
89
		// don't sync if we are throttled
90
		if ( $this->get_next_sync_time( $queue->id ) > microtime( true ) ) {
91
			return new WP_Error( 'sync_throttled' );
92
		}
93
94
		$start_time = microtime( true );
95
96
		Jetpack_Sync_Settings::set_is_syncing( true );
97
98
		$sync_result = $this->do_sync_for_queue( $queue );
99
100
		Jetpack_Sync_Settings::set_is_syncing( false );
101
102
		$exceeded_sync_wait_threshold = ( microtime( true ) - $start_time ) > (double) $this->get_sync_wait_threshold();
103
		
104
		if ( is_wp_error( $sync_result ) ) {
105
			if ( 'unclosed_buffer' === $sync_result->get_error_code() ) {
106
				$this->set_next_sync_time( time() + self::QUEUE_LOCKED_SYNC_DELAY, $queue->id );
107
			}
108
			if ( 'wpcom_error' === $sync_result->get_error_code() ) {
109
				$this->set_next_sync_time( time() + self::WPCOM_ERROR_SYNC_DELAY, $queue->id );
110
			}
111
		} elseif ( $exceeded_sync_wait_threshold ) {
112
			// if we actually sent data and it took a while, wait before sending again
113
			$this->set_next_sync_time( time() + $this->get_sync_wait_time(), $queue->id );
114
		}
115
116
		return $sync_result;
117
	}
118
119
	public function get_items_to_send( $buffer, $encode = true ) {
120
		// track how long we've been processing so we can avoid request timeouts
121
		$start_time = microtime( true );
122
		$upload_size   = 0;
123
		$items_to_send = array();
124
		$items         = $buffer->get_items();
125
		// set up current screen to avoid errors rendering content
126
		require_once( ABSPATH . 'wp-admin/includes/class-wp-screen.php' );
127
		require_once( ABSPATH . 'wp-admin/includes/screen.php' );
128
		set_current_screen( 'sync' );
129
		$skipped_items_ids = array();
130
		// we estimate the total encoded size as we go by encoding each item individually
131
		// this is expensive, but the only way to really know :/
132
		foreach ( $items as $key => $item ) {
133
			// Suspending cache addition help prevent overloading in memory cache of large sites.
134
			wp_suspend_cache_addition( true );
135
			/**
136
			 * Modify the data within an action before it is serialized and sent to the server
137
			 * For example, during full sync this expands Post ID's into full Post objects,
138
			 * so that we don't have to serialize the whole object into the queue.
139
			 *
140
			 * @since 4.2.0
141
			 *
142
			 * @param array The action parameters
143
			 * @param int The ID of the user who triggered the action
144
			 */
145
			$item[1] = apply_filters( 'jetpack_sync_before_send_' . $item[0], $item[1], $item[2] );
146
			wp_suspend_cache_addition( false );
147
			if ( $item[1] === false ) {
148
				$skipped_items_ids[] = $key;
149
				continue;
150
			}
151
			$encoded_item = $encode ? $this->codec->encode( $item ) : $item;
152
			$upload_size += strlen( $encoded_item );
153
			if ( $upload_size > $this->upload_max_bytes && count( $items_to_send ) > 0 ) {
154
				break;
155
			}
156
			$items_to_send[ $key ] = $encoded_item;
157
			if ( microtime(true) - $start_time > $this->max_dequeue_time ) {
158
				break;
159
			}
160
		}
161
162
		return array( $items_to_send, $skipped_items_ids, $items, microtime( true ) - $start_time );
163
	}
164
165
	public function do_sync_for_queue( $queue ) {
166
167
		do_action( 'jetpack_sync_before_send_queue_' . $queue->id );
168
		if ( $queue->size() === 0 ) {
169
			return new WP_Error( 'empty_queue_' . $queue->id );
170
		}
171
		// now that we're sure we are about to sync, try to
172
		// ignore user abort so we can avoid getting into a
173
		// bad state
174
		if ( function_exists( 'ignore_user_abort' ) ) {
175
			ignore_user_abort( true );
176
		}
177
178
		$checkout_start_time = microtime( true );
179
180
		$buffer = $queue->checkout_with_memory_limit( $this->dequeue_max_bytes, $this->upload_max_rows );
181
182
		if ( ! $buffer ) {
183
			// buffer has no items
184
			return new WP_Error( 'empty_buffer' );
185
		}
186
187
		if ( is_wp_error( $buffer ) ) {
188
			return $buffer;
189
		}
190
191
		$checkout_duration = microtime( true ) - $checkout_start_time;
192
193
		list( $items_to_send, $skipped_items_ids, $items, $preprocess_duration ) = $this->get_items_to_send( $buffer, true );
194
195
		/**
196
		 * Fires when data is ready to send to the server.
197
		 * Return false or WP_Error to abort the sync (e.g. if there's an error)
198
		 * The items will be automatically re-sent later
199
		 *
200
		 * @since 4.2.0
201
		 *
202
		 * @param array $data The action buffer
203
		 * @param string $codec The codec name used to encode the data
204
		 * @param double $time The current time
205
		 * @param string $queue The queue used to send ('sync' or 'full_sync')
206
		 */
207
		Jetpack_Sync_Settings::set_is_sending( true );
208
		$processed_item_ids = apply_filters( 'jetpack_sync_send_data', $items_to_send, $this->codec->name(), microtime( true ), $queue->id, $checkout_duration, $preprocess_duration );
209
		Jetpack_Sync_Settings::set_is_sending( false );
210
		
211
		if ( ! $processed_item_ids || is_wp_error( $processed_item_ids ) ) {
212
			$checked_in_item_ids = $queue->checkin( $buffer );
213
			if ( is_wp_error( $checked_in_item_ids ) ) {
214
				error_log( 'Error checking in buffer: ' . $checked_in_item_ids->get_error_message() );
215
				$queue->force_checkin();
216
			}
217
			if ( is_wp_error( $processed_item_ids ) ) {
218
				return new WP_Error( 'wpcom_error', $processed_item_ids->get_error_code() );
219
			}
220
			// returning a WP_Error('wpcom_error') is a sign to the caller that we should wait a while
221
			// before syncing again
222
			return new WP_Error( 'wpcom_error', 'jetpack_sync_send_data_false' );
223
		} else {
224
			// detect if the last item ID was an error
225
			$had_wp_error = is_wp_error( end( $processed_item_ids ) );
226
			if ( $had_wp_error ) {
227
				$wp_error = array_pop( $processed_item_ids );
228
			}
229
			// also checkin any items that were skipped
230
			if ( count( $skipped_items_ids ) > 0 ) {
231
				$processed_item_ids = array_merge( $processed_item_ids, $skipped_items_ids );
232
			}
233
			$processed_items = array_intersect_key( $items, array_flip( $processed_item_ids ) );
234
			/**
235
			 * Allows us to keep track of all the actions that have been sent.
236
			 * Allows us to calculate the progress of specific actions.
237
			 *
238
			 * @since 4.2.0
239
			 *
240
			 * @param array $processed_actions The actions that we send successfully.
241
			 */
242
			do_action( 'jetpack_sync_processed_actions', $processed_items );
243
			$queue->close( $buffer, $processed_item_ids );
244
			// returning a WP_Error is a sign to the caller that we should wait a while
245
			// before syncing again
246
			if ( $had_wp_error ) {
247
				return new WP_Error( 'wpcom_error', $wp_error->get_error_code() );
0 ignored issues
show
The variable $wp_error does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
248
			}
249
		}
250
		return true;
251
	}
252
253
	function get_sync_queue() {
254
		return $this->sync_queue;
255
	}
256
257
	function get_full_sync_queue() {
258
		return $this->full_sync_queue;
259
	}
260
261
	function get_codec() {
262
		return $this->codec;
263
	}
264
265
	function send_checksum() {
266
		require_once 'class.jetpack-sync-wp-replicastore.php';
267
		$store = new Jetpack_Sync_WP_Replicastore();
268
		do_action( 'jetpack_sync_checksum', $store->checksum_all() );
269
	}
270
271
	function reset_sync_queue() {
272
		$this->sync_queue->reset();
273
	}
274
275
	function reset_full_sync_queue() {
276
		$this->full_sync_queue->reset();
277
	}
278
279
	function set_dequeue_max_bytes( $size ) {
280
		$this->dequeue_max_bytes = $size;
281
	}
282
283
	// in bytes
284
	function set_upload_max_bytes( $max_bytes ) {
285
		$this->upload_max_bytes = $max_bytes;
286
	}
287
288
	// in rows
289
	function set_upload_max_rows( $max_rows ) {
290
		$this->upload_max_rows = $max_rows;
291
	}
292
293
	// in seconds
294
	function set_sync_wait_time( $seconds ) {
295
		$this->sync_wait_time = $seconds;
296
	}
297
298
	function get_sync_wait_time() {
299
		return $this->sync_wait_time;
300
	}
301
302
	function set_enqueue_wait_time( $seconds ) {
303
		$this->enqueue_wait_time = $seconds;
304
	}
305
306
	function get_enqueue_wait_time() {
307
		return $this->enqueue_wait_time;
308
	}
309
310
	// in seconds
311
	function set_sync_wait_threshold( $seconds ) {
312
		$this->sync_wait_threshold = $seconds;
313
	}
314
315
	function get_sync_wait_threshold() {
316
		return $this->sync_wait_threshold;
317
	}
318
319
	// in seconds
320
	function set_max_dequeue_time( $seconds ) {
321
		$this->max_dequeue_time = $seconds;
322
	}
323
324
	function set_defaults() {
325
		$this->sync_queue      = new Jetpack_Sync_Queue( 'sync' );
326
		$this->full_sync_queue = new Jetpack_Sync_Queue( 'full_sync' );
327
		$this->codec           = new Jetpack_Sync_JSON_Deflate_Array_Codec();
328
329
		// saved settings
330
		Jetpack_Sync_Settings::set_importing( null );
331
		$settings = Jetpack_Sync_Settings::get_settings();
332
		$this->set_dequeue_max_bytes( $settings['dequeue_max_bytes'] );
333
		$this->set_upload_max_bytes( $settings['upload_max_bytes'] );
334
		$this->set_upload_max_rows( $settings['upload_max_rows'] );
335
		$this->set_sync_wait_time( $settings['sync_wait_time'] );
336
		$this->set_enqueue_wait_time( $settings['enqueue_wait_time'] );
337
		$this->set_sync_wait_threshold( $settings['sync_wait_threshold'] );
338
		$this->set_max_dequeue_time( Jetpack_Sync_Defaults::get_max_sync_execution_time() );
339
	}
340
341
	function reset_data() {
342
		$this->reset_sync_queue();
343
		$this->reset_full_sync_queue();
344
345
		foreach ( Jetpack_Sync_Modules::get_modules() as $module ) {
346
			$module->reset_data();
347
		}
348
		
349
		foreach ( array( 'sync', 'full_sync' ) as $queue_name ) {
350
			delete_option( self::NEXT_SYNC_TIME_OPTION_NAME . '_' . $queue_name );
351
		}
352
353
		Jetpack_Sync_Settings::reset_data();
354
	}
355
356
	function uninstall() {
357
		// Lets delete all the other fun stuff like transient and option and the sync queue
358
		$this->reset_data();
359
360
		// delete the full sync status
361
		delete_option( 'jetpack_full_sync_status' );
362
363
		// clear the sync cron.
364
		wp_clear_scheduled_hook( 'jetpack_sync_cron' );
365
		wp_clear_scheduled_hook( 'jetpack_sync_full_cron' );
366
	}
367
}
368