Automattic /
jetpack
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
| 1 | <?php |
||
| 2 | |||
| 3 | namespace Automattic\Jetpack\Sync; |
||
| 4 | |||
| 5 | /** |
||
| 6 | * A persistent queue that can be flushed in increments of N items, |
||
| 7 | * and which blocks reads until checked-out buffers are checked in or |
||
| 8 | * closed. This uses raw SQL for two reasons: speed, and not triggering |
||
| 9 | * tons of added_option callbacks. |
||
| 10 | */ |
||
| 11 | class Queue { |
||
| 12 | public $id; |
||
| 13 | private $row_iterator; |
||
| 14 | |||
| 15 | function __construct( $id ) { |
||
| 16 | $this->id = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL |
||
| 17 | $this->row_iterator = 0; |
||
| 18 | $this->random_int = mt_rand( 1, 1000000 ); |
||
| 19 | } |
||
| 20 | |||
| 21 | function add( $item ) { |
||
| 22 | global $wpdb; |
||
| 23 | $added = false; |
||
| 24 | // this basically tries to add the option until enough time has elapsed that |
||
| 25 | // it has a unique (microtime-based) option key |
||
| 26 | while ( ! $added ) { |
||
| 27 | $rows_added = $wpdb->query( |
||
| 28 | $wpdb->prepare( |
||
| 29 | "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES (%s, %s,%s)", |
||
| 30 | $this->get_next_data_row_option_name(), |
||
| 31 | serialize( $item ), |
||
| 32 | 'no' |
||
| 33 | ) |
||
| 34 | ); |
||
| 35 | $added = ( 0 !== $rows_added ); |
||
| 36 | } |
||
| 37 | } |
||
| 38 | |||
| 39 | // Attempts to insert all the items in a single SQL query. May be subject to query size limits! |
||
| 40 | function add_all( $items ) { |
||
| 41 | global $wpdb; |
||
| 42 | $base_option_name = $this->get_next_data_row_option_name(); |
||
| 43 | |||
| 44 | $query = "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES "; |
||
| 45 | |||
| 46 | $rows = array(); |
||
| 47 | |||
| 48 | for ( $i = 0; $i < count( $items ); $i += 1 ) { |
||
| 49 | $option_name = esc_sql( $base_option_name . '-' . $i ); |
||
| 50 | $option_value = esc_sql( serialize( $items[ $i ] ) ); |
||
| 51 | $rows[] = "('$option_name', '$option_value', 'no')"; |
||
| 52 | } |
||
| 53 | |||
| 54 | $rows_added = $wpdb->query( $query . join( ',', $rows ) ); |
||
| 55 | |||
| 56 | if ( count( $items ) === $rows_added ) { |
||
| 57 | return new \WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" ); |
||
| 58 | } |
||
| 59 | } |
||
| 60 | |||
| 61 | // Peek at the front-most item on the queue without checking it out |
||
| 62 | function peek( $count = 1 ) { |
||
| 63 | $items = $this->fetch_items( $count ); |
||
| 64 | if ( $items ) { |
||
| 65 | return Utils::get_item_values( $items ); |
||
| 66 | } |
||
| 67 | |||
| 68 | return array(); |
||
| 69 | } |
||
| 70 | |||
| 71 | // lag is the difference in time between the age of the oldest item |
||
| 72 | // (aka first or frontmost item) and the current time |
||
| 73 | function lag( $now = null ) { |
||
| 74 | global $wpdb; |
||
| 75 | |||
| 76 | $first_item_name = $wpdb->get_var( |
||
| 77 | $wpdb->prepare( |
||
| 78 | "SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1", |
||
| 79 | "jpsq_{$this->id}-%" |
||
| 80 | ) |
||
| 81 | ); |
||
| 82 | |||
| 83 | if ( ! $first_item_name ) { |
||
| 84 | return 0; |
||
| 85 | } |
||
| 86 | |||
| 87 | if ( null === $now ) { |
||
| 88 | $now = microtime( true ); |
||
| 89 | } |
||
| 90 | |||
| 91 | // break apart the item name to get the timestamp |
||
| 92 | $matches = null; |
||
| 93 | if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $first_item_name, $matches ) ) { |
||
| 94 | return $now - floatval( $matches[1] ); |
||
| 95 | } else { |
||
| 96 | return 0; |
||
| 97 | } |
||
| 98 | } |
||
| 99 | |||
| 100 | function reset() { |
||
| 101 | global $wpdb; |
||
| 102 | $this->delete_checkout_id(); |
||
| 103 | $wpdb->query( |
||
| 104 | $wpdb->prepare( |
||
| 105 | "DELETE FROM $wpdb->options WHERE option_name LIKE %s", |
||
| 106 | "jpsq_{$this->id}-%" |
||
| 107 | ) |
||
| 108 | ); |
||
| 109 | } |
||
| 110 | |||
| 111 | function size() { |
||
| 112 | global $wpdb; |
||
| 113 | |||
| 114 | return (int) $wpdb->get_var( |
||
| 115 | $wpdb->prepare( |
||
| 116 | "SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s", |
||
| 117 | "jpsq_{$this->id}-%" |
||
| 118 | ) |
||
| 119 | ); |
||
| 120 | } |
||
| 121 | |||
| 122 | // we use this peculiar implementation because it's much faster than count(*) |
||
| 123 | function has_any_items() { |
||
| 124 | global $wpdb; |
||
| 125 | $value = $wpdb->get_var( |
||
| 126 | $wpdb->prepare( |
||
| 127 | "SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )", |
||
| 128 | "jpsq_{$this->id}-%" |
||
| 129 | ) |
||
| 130 | ); |
||
| 131 | |||
| 132 | return ( $value === '1' ); |
||
| 133 | } |
||
| 134 | |||
| 135 | function checkout( $buffer_size ) { |
||
| 136 | if ( $this->get_checkout_id() ) { |
||
| 137 | return new \WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' ); |
||
| 138 | } |
||
| 139 | |||
| 140 | $buffer_id = uniqid(); |
||
| 141 | |||
| 142 | $result = $this->set_checkout_id( $buffer_id ); |
||
| 143 | |||
| 144 | if ( ! $result || is_wp_error( $result ) ) { |
||
| 145 | return $result; |
||
| 146 | } |
||
| 147 | |||
| 148 | $items = $this->fetch_items( $buffer_size ); |
||
| 149 | |||
| 150 | if ( count( $items ) === 0 ) { |
||
| 151 | return false; |
||
| 152 | } |
||
| 153 | |||
| 154 | $buffer = new Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) ); |
||
| 155 | |||
| 156 | return $buffer; |
||
| 157 | } |
||
| 158 | |||
| 159 | // this checks out rows until it either empties the queue or hits a certain memory limit |
||
| 160 | // it loads the sizes from the DB first so that it doesn't accidentally |
||
| 161 | // load more data into memory than it needs to. |
||
| 162 | // The only way it will load more items than $max_size is if a single queue item |
||
| 163 | // exceeds the memory limit, but in that case it will send that item by itself. |
||
| 164 | function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) { |
||
| 165 | if ( $this->get_checkout_id() ) { |
||
| 166 | return new \WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' ); |
||
| 167 | } |
||
| 168 | |||
| 169 | $buffer_id = uniqid(); |
||
| 170 | |||
| 171 | $result = $this->set_checkout_id( $buffer_id ); |
||
| 172 | |||
| 173 | if ( ! $result || is_wp_error( $result ) ) { |
||
| 174 | return $result; |
||
| 175 | } |
||
| 176 | |||
| 177 | // get the map of buffer_id -> memory_size |
||
| 178 | global $wpdb; |
||
| 179 | |||
| 180 | $items_with_size = $wpdb->get_results( |
||
| 181 | $wpdb->prepare( |
||
| 182 | "SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", |
||
| 183 | "jpsq_{$this->id}-%", |
||
| 184 | $max_buffer_size |
||
| 185 | ), |
||
| 186 | OBJECT |
||
| 187 | ); |
||
| 188 | |||
| 189 | if ( count( $items_with_size ) === 0 ) { |
||
| 190 | return false; |
||
| 191 | } |
||
| 192 | |||
| 193 | $total_memory = 0; |
||
| 194 | |||
| 195 | $min_item_id = $max_item_id = $items_with_size[0]->id; |
||
| 196 | |||
| 197 | foreach ( $items_with_size as $id => $item_with_size ) { |
||
| 198 | $total_memory += $item_with_size->value_size; |
||
| 199 | |||
| 200 | // if this is the first item and it exceeds memory, allow loop to continue |
||
| 201 | // we will exit on the next iteration instead |
||
| 202 | if ( $total_memory > $max_memory && $id > 0 ) { |
||
| 203 | break; |
||
| 204 | } |
||
| 205 | |||
| 206 | $max_item_id = $item_with_size->id; |
||
| 207 | } |
||
| 208 | |||
| 209 | $query = $wpdb->prepare( |
||
| 210 | "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name >= %s and option_name <= %s ORDER BY option_name ASC", |
||
| 211 | $min_item_id, |
||
| 212 | $max_item_id |
||
| 213 | ); |
||
| 214 | |||
| 215 | $items = $wpdb->get_results( $query, OBJECT ); |
||
| 216 | foreach ( $items as $item ) { |
||
| 217 | $item->value = maybe_unserialize( $item->value ); |
||
| 218 | } |
||
| 219 | |||
| 220 | if ( count( $items ) === 0 ) { |
||
| 221 | $this->delete_checkout_id(); |
||
| 222 | |||
| 223 | return false; |
||
| 224 | } |
||
| 225 | |||
| 226 | $buffer = new Queue_Buffer( $buffer_id, $items ); |
||
| 227 | |||
| 228 | return $buffer; |
||
| 229 | } |
||
| 230 | |||
| 231 | function checkin( $buffer ) { |
||
| 232 | $is_valid = $this->validate_checkout( $buffer ); |
||
| 233 | |||
| 234 | if ( is_wp_error( $is_valid ) ) { |
||
| 235 | return $is_valid; |
||
| 236 | } |
||
| 237 | |||
| 238 | $this->delete_checkout_id(); |
||
| 239 | |||
| 240 | return true; |
||
| 241 | } |
||
| 242 | |||
| 243 | function close( $buffer, $ids_to_remove = null ) { |
||
| 244 | $is_valid = $this->validate_checkout( $buffer ); |
||
| 245 | |||
| 246 | if ( is_wp_error( $is_valid ) ) { |
||
| 247 | return $is_valid; |
||
| 248 | } |
||
| 249 | |||
| 250 | $this->delete_checkout_id(); |
||
| 251 | |||
| 252 | // by default clear all items in the buffer |
||
| 253 | if ( is_null( $ids_to_remove ) ) { |
||
| 254 | $ids_to_remove = $buffer->get_item_ids(); |
||
| 255 | } |
||
| 256 | |||
| 257 | global $wpdb; |
||
| 258 | |||
| 259 | if ( count( $ids_to_remove ) > 0 ) { |
||
| 260 | $sql = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')'; |
||
| 261 | $query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) ); |
||
| 262 | $wpdb->query( $query ); |
||
| 263 | } |
||
| 264 | |||
| 265 | return true; |
||
| 266 | } |
||
| 267 | |||
| 268 | function flush_all() { |
||
| 269 | $items = Utils::get_item_values( $this->fetch_items() ); |
||
| 270 | $this->reset(); |
||
| 271 | |||
| 272 | return $items; |
||
| 273 | } |
||
| 274 | |||
| 275 | function get_all() { |
||
| 276 | return $this->fetch_items(); |
||
| 277 | } |
||
| 278 | |||
| 279 | // use with caution, this could allow multiple processes to delete |
||
| 280 | // and send from the queue at the same time |
||
| 281 | function force_checkin() { |
||
| 282 | $this->delete_checkout_id(); |
||
| 283 | } |
||
| 284 | |||
| 285 | // used to lock checkouts from the queue. |
||
| 286 | // tries to wait up to $timeout seconds for the queue to be empty |
||
| 287 | function lock( $timeout = 30 ) { |
||
| 288 | $tries = 0; |
||
| 289 | |||
| 290 | while ( $this->has_any_items() && $tries < $timeout ) { |
||
| 291 | sleep( 1 ); |
||
| 292 | $tries += 1; |
||
| 293 | } |
||
| 294 | |||
| 295 | if ( $tries === 30 ) { |
||
| 296 | return new \WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' ); |
||
| 297 | } |
||
| 298 | |||
| 299 | if ( $this->get_checkout_id() ) { |
||
| 300 | return new \WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' ); |
||
| 301 | } |
||
| 302 | |||
| 303 | // hopefully this means we can acquire a checkout? |
||
| 304 | $result = $this->set_checkout_id( 'lock' ); |
||
| 305 | |||
| 306 | if ( ! $result || is_wp_error( $result ) ) { |
||
| 307 | return $result; |
||
| 308 | } |
||
| 309 | |||
| 310 | return true; |
||
| 311 | } |
||
| 312 | |||
| 313 | function unlock() { |
||
| 314 | return $this->delete_checkout_id(); |
||
| 315 | } |
||
| 316 | |||
| 317 | /** |
||
| 318 | * This option is specifically chosen to, as much as possible, preserve time order |
||
| 319 | * and minimise the possibility of collisions between multiple processes working |
||
| 320 | * at the same time. |
||
| 321 | * |
||
| 322 | * @return string |
||
| 323 | */ |
||
| 324 | protected function generate_option_name_timestamp() { |
||
| 325 | return sprintf( '%.6f', microtime( true ) ); |
||
| 326 | } |
||
| 327 | |||
| 328 | private function get_checkout_id() { |
||
| 329 | global $wpdb; |
||
| 330 | $checkout_value = $wpdb->get_var( |
||
| 331 | $wpdb->prepare( |
||
| 332 | "SELECT option_value FROM $wpdb->options WHERE option_name = %s", |
||
| 333 | $this->get_lock_option_name() |
||
| 334 | ) |
||
| 335 | ); |
||
| 336 | |||
| 337 | if ( $checkout_value ) { |
||
| 338 | list( $checkout_id, $timestamp ) = explode( ':', $checkout_value ); |
||
| 339 | if ( intval( $timestamp ) > time() ) { |
||
| 340 | return $checkout_id; |
||
| 341 | } |
||
| 342 | } |
||
| 343 | |||
| 344 | return false; |
||
| 345 | } |
||
| 346 | |||
| 347 | private function set_checkout_id( $checkout_id ) { |
||
| 348 | global $wpdb; |
||
| 349 | |||
| 350 | $expires = time() + \Jetpack_Sync_Defaults::$default_sync_queue_lock_timeout; |
||
| 351 | $updated_num = $wpdb->query( |
||
| 352 | $wpdb->prepare( |
||
| 353 | "UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s", |
||
| 354 | "$checkout_id:$expires", |
||
| 355 | $this->get_lock_option_name() |
||
| 356 | ) |
||
| 357 | ); |
||
| 358 | |||
| 359 | if ( ! $updated_num ) { |
||
| 360 | $updated_num = $wpdb->query( |
||
| 361 | $wpdb->prepare( |
||
| 362 | "INSERT INTO $wpdb->options ( option_name, option_value, autoload ) VALUES ( %s, %s, 'no' )", |
||
| 363 | $this->get_lock_option_name(), |
||
| 364 | "$checkout_id:$expires" |
||
| 365 | ) |
||
| 366 | ); |
||
| 367 | } |
||
| 368 | |||
| 369 | return $updated_num; |
||
| 370 | } |
||
| 371 | |||
| 372 | private function delete_checkout_id() { |
||
| 373 | global $wpdb; |
||
| 374 | // rather than delete, which causes fragmentation, we update in place |
||
| 375 | return $wpdb->query( |
||
| 376 | $wpdb->prepare( |
||
| 377 | "UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s", |
||
| 378 | '0:0', |
||
| 379 | $this->get_lock_option_name() |
||
| 380 | ) |
||
| 381 | ); |
||
| 382 | |||
| 383 | } |
||
| 384 | |||
| 385 | private function get_lock_option_name() { |
||
| 386 | return "jpsq_{$this->id}_checkout"; |
||
| 387 | } |
||
| 388 | |||
| 389 | private function get_next_data_row_option_name() { |
||
| 390 | $timestamp = $this->generate_option_name_timestamp(); |
||
| 391 | |||
| 392 | // row iterator is used to avoid collisions where we're writing data waaay fast in a single process |
||
| 393 | if ( $this->row_iterator === PHP_INT_MAX ) { |
||
| 394 | $this->row_iterator = 0; |
||
| 395 | } else { |
||
| 396 | $this->row_iterator += 1; |
||
| 397 | } |
||
| 398 | |||
| 399 | return 'jpsq_' . $this->id . '-' . $timestamp . '-' . $this->random_int . '-' . $this->row_iterator; |
||
| 400 | } |
||
| 401 | |||
| 402 | private function fetch_items( $limit = null ) { |
||
| 403 | global $wpdb; |
||
| 404 | |||
| 405 | if ( $limit ) { |
||
| 406 | $query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit ); |
||
| 407 | } else { |
||
| 408 | $query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" ); |
||
| 409 | } |
||
| 410 | |||
| 411 | $items = $wpdb->get_results( $query_sql, OBJECT ); |
||
| 412 | foreach ( $items as $item ) { |
||
| 413 | $item->value = maybe_unserialize( $item->value ); |
||
| 414 | } |
||
| 415 | |||
| 416 | return $items; |
||
| 417 | } |
||
| 418 | |||
| 419 | private function validate_checkout( $buffer ) { |
||
| 420 | if ( ! $buffer instanceof Queue_Buffer ) { |
||
| 421 | return new \WP_Error( 'not_a_buffer', 'You must checkin an instance of Automattic\\Jetpack\\Sync\\Queue_Buffer' ); |
||
|
0 ignored issues
–
show
|
|||
| 422 | } |
||
| 423 | |||
| 424 | $checkout_id = $this->get_checkout_id(); |
||
| 425 | |||
| 426 | if ( ! $checkout_id ) { |
||
| 427 | return new \WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' ); |
||
| 428 | } |
||
| 429 | |||
| 430 | if ( $checkout_id != $buffer->id ) { |
||
| 431 | return new \WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' ); |
||
| 432 | } |
||
| 433 | |||
| 434 | return true; |
||
| 435 | } |
||
| 436 | } |
||
| 437 |
This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.
If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.
In this case you can add the
@ignorePhpDoc annotation to the duplicate definition and it will be ignored.