Completed
Push — update/calypso-post-previews ( b8001f...c8a585 )
by
unknown
11:03
created

Jetpack_Sync_Module_Full_Sync::queue_item_added()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 2
c 1
b 0
f 0
nc 1
nop 0
dl 0
loc 3
rs 10
1
<?php
2
3
/**
4
 * This class does a full resync of the database by
5
 * enqueuing an outbound action for every single object
6
 * that we care about.
7
 *
8
 * This class, and its related class Jetpack_Sync_Module, contain a few non-obvious optimisations that should be explained:
9
 * - we fire an action called jetpack_full_sync_start so that WPCOM can erase the contents of the cached database
10
 * - for each object type, we page through the object IDs and enqueue them by firing some monitored actions
11
 * - we load the full objects for those IDs in chunks of Jetpack_Sync_Module::ARRAY_CHUNK_SIZE (to reduce the number of MySQL calls)
12
 * - we fire a trigger for the entire array which the Jetpack_Sync_Listener then serializes and queues.
13
 */
14
15
require_once 'class.jetpack-sync-wp-replicastore.php';
16
17
class Jetpack_Sync_Module_Full_Sync extends Jetpack_Sync_Module {
18
	const STATUS_OPTION_PREFIX = 'jetpack_sync_full_';
19
	const FULL_SYNC_TIMEOUT = 3600;
20
21
	private $items_added_since_last_pause;
22
	private $last_pause_time;
23
	private $queue_rate_limit;
24
25
	public function name() {
26
		return 'full-sync';
27
	}
28
29
	function init_full_sync_listeners( $callable ) {
30
		// synthetic actions for full sync
31
		add_action( 'jetpack_full_sync_start', $callable );
32
		add_action( 'jetpack_full_sync_end', $callable );
33
		add_action( 'jetpack_full_sync_cancelled', $callable );
34
	}
35
36
	function init_before_send() {
37
		// this is triggered after actions have been processed on the server
38
		add_action( 'jetpack_sync_processed_actions', array( $this, 'update_sent_progress_action' ) );
39
	}
40
41
	function start( $modules = null ) {
42
		$was_already_running = $this->is_started() && ! $this->is_finished();
43
44
		// remove all evidence of previous full sync items and status
45
		$this->reset_data();
46
47
		$this->enable_queue_rate_limit();
48
49
		if ( $was_already_running ) {
50
			/**
51
			 * Fires when a full sync is cancelled.
52
			 *
53
			 * @since 4.2.0
54
			 */
55
			do_action( 'jetpack_full_sync_cancelled' );
56
		}
57
58
		/**
59
		 * Fires when a full sync begins. This action is serialized
60
		 * and sent to the server so that it knows a full sync is coming.
61
		 *
62
		 * @since 4.2.0
63
		 */
64
		do_action( 'jetpack_full_sync_start', $modules );
65
		$this->update_status_option( 'started', time() );
66
67
		// configure modules
68
		if ( ! is_array( $modules ) ) {
69
			$modules = array();
70
		}
71
72
		// by default, all modules are fully enabled
73
		if ( count( $modules ) === 0 ) {
74
			$default_module_config = true;
75
		} else {
76
			$default_module_config = false;
77
		}
78
79
		// set default configuration, calculate totals, and save configuration if totals > 0
80
		foreach ( Jetpack_Sync_Modules::get_modules() as $module ) {
81
			$module_name = $module->name();
82
			if ( ! isset( $modules[ $module_name ] ) ) {
83
				$modules[ $module_name ] = $default_module_config;
84
			}
85
86
			// check if this module is enabled
87
			if ( ! ( $module_config = $modules[ $module_name ] ) ) {
88
				continue;
89
			}
90
91
			$total_items = $module->estimate_full_sync_actions( $module_config );
92
93
			if ( ! is_null( $total_items ) && $total_items > 0 ) {
94
				$this->update_status_option( "{$module_name}_total", $total_items );
95
				$this->update_status_option( "{$module_name}_config", $module_config );
96
			}
97
		}
98
99
		foreach ( Jetpack_Sync_Modules::get_modules() as $module ) {
100
			$module_name   = $module->name();
101
			$module_config = $modules[ $module_name ];
102
103
			// check if this module is enabled
104
			if ( ! $module_config ) {
105
				continue;
106
			}
107
108
			$items_enqueued = $module->enqueue_full_sync_actions( $module_config );
109
110
			if ( ! is_null( $items_enqueued ) && $items_enqueued > 0 ) {
111
				$this->update_status_option( "{$module_name}_queued", $items_enqueued );
112
			}
113
		}
114
115
		$this->update_status_option( 'queue_finished', time() );
116
117
		$store = new Jetpack_Sync_WP_Replicastore();
118
119
		/**
120
		 * Fires when a full sync ends. This action is serialized
121
		 * and sent to the server with checksums so that we can confirm the
122
		 * sync was successful.
123
		 *
124
		 * @since 4.2.0
125
		 */
126
		do_action( 'jetpack_full_sync_end', $store->checksum_all() );
127
128
		$this->disable_queue_rate_limit();
129
130
		return true;
131
	}
132
133
	function update_sent_progress_action( $actions ) {
134
135
		// quick way to map to first items with an array of arrays
136
		$actions_with_counts = array_count_values( array_map( 'reset', $actions ) );
137
138
		if ( ! $this->is_started() || $this->is_finished() ) {
139
			return;
140
		}
141
142
		if ( isset( $actions_with_counts['jetpack_full_sync_start'] ) ) {
143
			$this->update_status_option( 'sent_started', time() );
144
		}
145
146
		foreach ( Jetpack_Sync_Modules::get_modules() as $module ) {
147
			$module_actions     = $module->get_full_sync_actions();
148
			$status_option_name = "{$module->name()}_sent";
149
			$items_sent         = $this->get_status_option( $status_option_name, 0 );
150
151
			foreach ( $module_actions as $module_action ) {
152
				if ( isset( $actions_with_counts[ $module_action ] ) ) {
153
					$items_sent += $actions_with_counts[ $module_action ];
154
				}
155
			}
156
157
			if ( $items_sent > 0 ) {
158
				$this->update_status_option( $status_option_name, $items_sent );
159
			}	
160
		}
161
162
		if ( isset( $actions_with_counts['jetpack_full_sync_end'] ) ) {
163
			$this->update_status_option( 'finished', time() );
164
		}
165
	}
166
167
	public function is_started() {
168
		return !! $this->get_status_option( 'started' );
169
	}
170
171
	public function is_finished() {
172
		return !! $this->get_status_option( 'finished' );
173
	}
174
175
	public function get_status() {
176
		$status = array(
177
			'started'        => $this->get_status_option( 'started' ),
178
			'queue_finished' => $this->get_status_option( 'queue_finished' ),
179
			'sent_started'   => $this->get_status_option( 'sent_started' ),
180
			'finished'       => $this->get_status_option( 'finished' ),
181
			'sent'           => array(),
182
			'queue'          => array(),
183
			'config'         => array(),
184
			'total'          => array(),
185
		);
186
187
		foreach ( Jetpack_Sync_Modules::get_modules() as $module ) {
188
			$name = $module->name();
189
190
			if ( $total = $this->get_status_option( "{$name}_total" ) ) {
191
				$status[ 'total' ][ $name ] = $total;
192
			}
193
194
			if ( $queued = $this->get_status_option( "{$name}_queued" ) ) {
195
				$status[ 'queue' ][ $name ] = $queued;
196
			}
197
			
198
			if ( $sent = $this->get_status_option( "{$name}_sent" ) ) {
199
				$status[ 'sent' ][ $name ] = $sent;
200
			}
201
202
			if ( $config = $this->get_status_option( "{$name}_config" ) ) {
203
				$status[ 'config' ][ $name ] = $config;
204
			}
205
		}
206
207
		return $status;
208
	}
209
210
	public function clear_status() {
211
		$prefix = self::STATUS_OPTION_PREFIX;
212
		delete_option( "{$prefix}_started" );
213
		delete_option( "{$prefix}_queue_finished" );
214
		delete_option( "{$prefix}_sent_started" );
215
		delete_option( "{$prefix}_finished" );
216
217
		foreach ( Jetpack_Sync_Modules::get_modules() as $module ) {
218
			delete_option( "{$prefix}_{$module->name()}_total" );
219
			delete_option( "{$prefix}_{$module->name()}_queued" );
220
			delete_option( "{$prefix}_{$module->name()}_sent" );
221
			delete_option( "{$prefix}_{$module->name()}_config" );
222
		}
223
	}
224
225
	public function reset_data() {
226
		$this->clear_status();
227
		require_once dirname( __FILE__ ) . '/class.jetpack-sync-listener.php';
228
		$listener = Jetpack_Sync_Listener::get_instance();
229
		$listener->get_full_sync_queue()->reset();
230
	}
231
232
	private function get_status_option( $option, $default = null ) {
233
		$prefix = self::STATUS_OPTION_PREFIX;
234
235
		$value = get_option( "{$prefix}_{$option}", $default );
236
		
237
		if ( ! $value ) {
238
			// don't cast to int if we didn't find a value - we want to preserve null or false as sentinals
239
			return $default;
240
		}
241
242
		return is_numeric( $value ) ? intval( $value ) : $value;
243
	}
244
245
	private function update_status_option( $name, $value ) {
246
		$prefix = self::STATUS_OPTION_PREFIX;
247
		update_option( "{$prefix}_{$name}", $value, false );
248
	}
249
250
	private function enable_queue_rate_limit() {
251
		$this->queue_rate_limit = Jetpack_Sync_Settings::get_setting( 'queue_max_writes_sec' );
252
		$this->items_added_since_last_pause = 0;
253
		$this->last_pause_time = microtime( true );
254
255
		add_action( 'jpsq_item_added', array( $this, 'queue_item_added' ) );
256
		add_action( 'jpsq_items_added', array( $this, 'queue_items_added' ) );
257
	}
258
259
	private function disable_queue_rate_limit() {
260
		remove_action( 'jpsq_item_added', array( $this, 'queue_item_added' ) );
261
		remove_action( 'jpsq_items_added', array( $this, 'queue_items_added' ) );
262
	}
263
264
	public function queue_item_added() {
265
		$this->queue_items_added( 1 );
266
	}
267
268
	public function queue_items_added( $item_count ) {
0 ignored issues
show
Unused Code introduced by
The parameter $item_count is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
269
		// jpsq_item_added and jpsq_items_added both exec 1 db query, 
270
		// so we ignore $item_count and treat it as always 1
271
		$this->items_added_since_last_pause += 1; 
272
273
		if ( $this->items_added_since_last_pause > $this->queue_rate_limit ) {
274
			// sleep for the rest of the second
275
			$sleep_til = $this->last_pause_time + 1.0;
276
			$sleep_duration = $sleep_til - microtime( true );
277
			if ( $sleep_duration > 0.0 ) {
278
				usleep( $sleep_duration * 1000000 );
279
				$this->last_pause_time = microtime( true );
280
			}
281
			$this->items_added_since_last_pause = 0;
282
		}
283
	}
284
}
285