Completed
Push — update/sync-psr4-queue ( 404e00...b3201d )
by
unknown
80:35 queued 71:27
created

Listener::init()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
nc 2
nop 0
dl 0
loc 19
rs 9.6333
c 0
b 0
f 0
1
<?php
2
3
namespace Automattic\Jetpack\Sync;
4
use Automattic\Jetpack\Sync\Queue;
5
6
/**
7
 * This class monitors actions and logs them to the queue to be sent
8
 */
9
class Listener {
10
	const QUEUE_STATE_CHECK_TRANSIENT = 'jetpack_sync_last_checked_queue_state';
11
	const QUEUE_STATE_CHECK_TIMEOUT   = 300; // 5 minutes
12
13
	private $sync_queue;
14
	private $full_sync_queue;
15
	private $sync_queue_size_limit;
16
	private $sync_queue_lag_limit;
17
18
	// singleton functions
19
	private static $instance;
20
21
	public static function get_instance() {
22
		if ( null === self::$instance ) {
23
			self::$instance = new self();
24
		}
25
26
		return self::$instance;
27
	}
28
29
	// this is necessary because you can't use "new" when you declare instance properties >:(
30
	protected function __construct() {
31
		\Jetpack_Sync_Main::init();
32
		$this->set_defaults();
33
		$this->init();
34
	}
35
36
	private function init() {
37
		$handler           = array( $this, 'action_handler' );
38
		$full_sync_handler = array( $this, 'full_sync_action_handler' );
39
40
		foreach ( \Jetpack_Sync_Modules::get_modules() as $module ) {
41
			$module->init_listeners( $handler );
42
			$module->init_full_sync_listeners( $full_sync_handler );
43
		}
44
45
		// Module Activation
46
		add_action( 'jetpack_activate_module', $handler );
47
		add_action( 'jetpack_deactivate_module', $handler );
48
49
		// Jetpack Upgrade
50
		add_action( 'updating_jetpack_version', $handler, 10, 2 );
51
52
		// Send periodic checksum
53
		add_action( 'jetpack_sync_checksum', $handler );
54
	}
55
56
	function get_sync_queue() {
57
		return $this->sync_queue;
58
	}
59
60
	function get_full_sync_queue() {
61
		return $this->full_sync_queue;
62
	}
63
64
	function set_queue_size_limit( $limit ) {
65
		$this->sync_queue_size_limit = $limit;
66
	}
67
68
	function get_queue_size_limit() {
69
		return $this->sync_queue_size_limit;
70
	}
71
72
	function set_queue_lag_limit( $age ) {
73
		$this->sync_queue_lag_limit = $age;
74
	}
75
76
	function get_queue_lag_limit() {
77
		return $this->sync_queue_lag_limit;
78
	}
79
80
	function force_recheck_queue_limit() {
81
		delete_transient( self::QUEUE_STATE_CHECK_TRANSIENT . '_' . $this->sync_queue->id );
82
		delete_transient( self::QUEUE_STATE_CHECK_TRANSIENT . '_' . $this->full_sync_queue->id );
83
	}
84
85
	// prevent adding items to the queue if it hasn't sent an item for 15 mins
86
	// AND the queue is over 1000 items long (by default)
87
	function can_add_to_queue( $queue ) {
88
		if ( ! \Jetpack_Sync_Settings::is_sync_enabled() ) {
89
			return false;
90
		}
91
92
		$state_transient_name = self::QUEUE_STATE_CHECK_TRANSIENT . '_' . $queue->id;
93
94
		$queue_state = get_transient( $state_transient_name );
95
96
		if ( false === $queue_state ) {
97
			$queue_state = array( $queue->size(), $queue->lag() );
98
			set_transient( $state_transient_name, $queue_state, self::QUEUE_STATE_CHECK_TIMEOUT );
99
		}
100
101
		list( $queue_size, $queue_age ) = $queue_state;
102
103
		return ( $queue_age < $this->sync_queue_lag_limit )
104
			   ||
105
			   ( ( $queue_size + 1 ) < $this->sync_queue_size_limit );
106
	}
107
108
	function full_sync_action_handler() {
109
		$args = func_get_args();
110
		$this->enqueue_action( current_filter(), $args, $this->full_sync_queue );
111
	}
112
113
	function action_handler() {
114
		$args = func_get_args();
115
		$this->enqueue_action( current_filter(), $args, $this->sync_queue );
116
	}
117
118
	// add many actions to the queue directly, without invoking them
119
120
	/**
121
	 * Bulk add action to the queue.
122
	 *
123
	 * @param $action_name String the name the full sync action.
124
	 * @param $args_array Array of chunked arguments
125
	 */
126
	function bulk_enqueue_full_sync_actions( $action_name, $args_array ) {
127
		$queue = $this->get_full_sync_queue();
128
129
		// periodically check the size of the queue, and disable adding to it if
130
		// it exceeds some limit AND the oldest item exceeds the age limit (i.e. sending has stopped)
131
		if ( ! $this->can_add_to_queue( $queue ) ) {
132
			return;
133
		}
134
135
		// if we add any items to the queue, we should try to ensure that our script
136
		// can't be killed before they are sent
137
		if ( function_exists( 'ignore_user_abort' ) ) {
138
			ignore_user_abort( true );
139
		}
140
141
		$data_to_enqueue = array();
142
		$user_id         = get_current_user_id();
143
		$currtime        = microtime( true );
144
		$is_importing    = \Jetpack_Sync_Settings::is_importing();
145
146
		foreach ( $args_array as $args ) {
147
			$previous_end = isset( $args['previous_end'] ) ? $args['previous_end'] : null;
148
			$args         = isset( $args['ids'] ) ? $args['ids'] : $args;
149
150
			/**
151
			 * Modify or reject the data within an action before it is enqueued locally.
152
			 *
153
			 * @since 4.2.0
154
			 *
155
			 * @module sync
156
			 *
157
			 * @param array The action parameters
158
			 */
159
			$args        = apply_filters( "jetpack_sync_before_enqueue_$action_name", $args );
160
			$action_data = array( $args );
161
			if ( ! is_null( $previous_end ) ) {
162
				$action_data[] = $previous_end;
163
			}
164
			// allow listeners to abort
165
			if ( $args === false ) {
166
				continue;
167
			}
168
169
			$data_to_enqueue[] = array(
170
				$action_name,
171
				$action_data,
172
				$user_id,
173
				$currtime,
174
				$is_importing,
175
			);
176
		}
177
178
		$queue->add_all( $data_to_enqueue );
179
	}
180
181
	function enqueue_action( $current_filter, $args, $queue ) {
182
		// don't enqueue an action during the outbound http request - this prevents recursion
183
		if ( \Jetpack_Sync_Settings::is_sending() ) {
184
			return;
185
		}
186
187
		/**
188
		 * Add an action hook to execute when anything on the whitelist gets sent to the queue to sync.
189
		 *
190
		 * @module sync
191
		 *
192
		 * @since 5.9.0
193
		 */
194
		do_action( 'jetpack_sync_action_before_enqueue' );
195
196
		/**
197
		 * Modify or reject the data within an action before it is enqueued locally.
198
		 *
199
		 * @since 4.2.0
200
		 *
201
		 * @param array The action parameters
202
		 */
203
		$args = apply_filters( "jetpack_sync_before_enqueue_$current_filter", $args );
204
205
		// allow listeners to abort
206
		if ( $args === false ) {
207
			return;
208
		}
209
210
		// periodically check the size of the queue, and disable adding to it if
211
		// it exceeds some limit AND the oldest item exceeds the age limit (i.e. sending has stopped)
212
		if ( ! $this->can_add_to_queue( $queue ) ) {
213
			return;
214
		}
215
216
		// if we add any items to the queue, we should try to ensure that our script
217
		// can't be killed before they are sent
218
		if ( function_exists( 'ignore_user_abort' ) ) {
219
			ignore_user_abort( true );
220
		}
221
222
		if (
223
			'sync' === $queue->id ||
224
			in_array(
225
				$current_filter,
226
				array(
227
					'jetpack_full_sync_start',
228
					'jetpack_full_sync_end',
229
					'jetpack_full_sync_cancel',
230
				)
231
			)
232
		) {
233
			$queue->add(
234
				array(
235
					$current_filter,
236
					$args,
237
					get_current_user_id(),
238
					microtime( true ),
239
					\Jetpack_Sync_Settings::is_importing(),
240
					$this->get_actor( $current_filter, $args ),
241
				)
242
			);
243
		} else {
244
			$queue->add(
245
				array(
246
					$current_filter,
247
					$args,
248
					get_current_user_id(),
249
					microtime( true ),
250
					\Jetpack_Sync_Settings::is_importing(),
251
				)
252
			);
253
		}
254
255
		// since we've added some items, let's try to load the sender so we can send them as quickly as possible
256
		if ( ! \Jetpack_Sync_Actions::$sender ) {
0 ignored issues
show
Bug introduced by
The property sender cannot be accessed from this context as it is declared private in class Jetpack_Sync_Actions.

This check looks for access to properties that are not accessible from the current context.

If you need to make a property accessible to another context you can either raise its visibility level or provide an accessible getter in the defining class.

Loading history...
257
			add_filter( 'jetpack_sync_sender_should_load', '__return_true' );
258
			if ( did_action( 'init' ) ) {
259
				\Jetpack_Sync_Actions::add_sender_shutdown();
260
			}
261
		}
262
	}
263
264
	function get_actor( $current_filter, $args ) {
265
		if ( 'wp_login' === $current_filter ) {
266
			$user = get_user_by( 'ID', $args[1]->data->ID );
267
		} else {
268
			$user = wp_get_current_user();
269
		}
270
271
		$translated_role = \Jetpack::translate_user_to_role( $user );
272
273
		$actor = array(
274
			'wpcom_user_id'    => null,
275
			'external_user_id' => isset( $user->ID ) ? $user->ID : null,
276
			'display_name'     => isset( $user->display_name ) ? $user->display_name : null,
277
			'user_email'       => isset( $user->user_email ) ? $user->user_email : null,
278
			'user_roles'       => isset( $user->roles ) ? $user->roles : null,
279
			'translated_role'  => $translated_role ? $translated_role : null,
280
			'is_cron'          => defined( 'DOING_CRON' ) ? DOING_CRON : false,
281
			'is_rest'          => defined( 'REST_API_REQUEST' ) ? REST_API_REQUEST : false,
282
			'is_xmlrpc'        => defined( 'XMLRPC_REQUEST' ) ? XMLRPC_REQUEST : false,
283
			'is_wp_rest'       => defined( 'REST_REQUEST' ) ? REST_REQUEST : false,
284
			'is_ajax'          => defined( 'DOING_AJAX' ) ? DOING_AJAX : false,
285
			'is_wp_admin'      => is_admin(),
286
			'is_cli'           => defined( 'WP_CLI' ) ? WP_CLI : false,
287
			'from_url'         => $this->get_request_url(),
288
		);
289
290
		if ( $this->should_send_user_data_with_actor( $current_filter ) ) {
291
			require_once JETPACK__PLUGIN_DIR . 'modules/protect/shared-functions.php';
292
			$actor['ip']         = jetpack_protect_get_ip();
293
			$actor['user_agent'] = isset( $_SERVER['HTTP_USER_AGENT'] ) ? $_SERVER['HTTP_USER_AGENT'] : 'unknown';
294
		}
295
296
		return $actor;
297
	}
298
299
	function should_send_user_data_with_actor( $current_filter ) {
300
		$should_send = in_array( $current_filter, array( 'jetpack_wp_login', 'wp_logout', 'jetpack_valid_failed_login_attempt' ) );
301
		/**
302
		 * Allow or deny sending actor's user data ( IP and UA ) during a sync event
303
		 *
304
		 * @since 5.8.0
305
		 *
306
		 * @module sync
307
		 *
308
		 * @param bool True if we should send user data
309
		 * @param string The current filter that is performing the sync action
310
		 */
311
		return apply_filters( 'jetpack_sync_actor_user_data', $should_send, $current_filter );
312
	}
313
314
	function set_defaults() {
315
		$this->sync_queue      = new Queue( 'sync' );
316
		$this->full_sync_queue = new Queue( 'full_sync' );
317
		$this->set_queue_size_limit( \Jetpack_Sync_Settings::get_setting( 'max_queue_size' ) );
318
		$this->set_queue_lag_limit( \Jetpack_Sync_Settings::get_setting( 'max_queue_lag' ) );
319
	}
320
321
	function get_request_url() {
322
		if ( isset( $_SERVER['HTTP_HOST'], $_SERVER['REQUEST_URI'] ) ) {
323
			return 'http' . ( isset( $_SERVER['HTTPS'] ) ? 's' : '' ) . '://' . "{$_SERVER['HTTP_HOST']}{$_SERVER['REQUEST_URI']}";
324
		}
325
		return is_admin() ? get_admin_url( get_current_blog_id() ) : home_url();
326
	}
327
}
328