1 | <?php |
||
12 | class Jetpack_Sync_Sender { |
||
13 | |||
14 | const SYNC_THROTTLE_OPTION_NAME = 'jetpack_sync_min_wait'; |
||
15 | const LAST_SYNC_TIME_OPTION_NAME = 'jetpack_last_sync_time'; |
||
16 | |||
17 | private $dequeue_max_bytes; |
||
18 | private $upload_max_bytes; |
||
19 | private $upload_max_rows; |
||
20 | private $sync_wait_time; |
||
21 | private $sync_queue; |
||
22 | private $codec; |
||
23 | |||
24 | // singleton functions |
||
25 | private static $instance; |
||
26 | |||
27 | public static function get_instance() { |
||
28 | if ( null === self::$instance ) { |
||
29 | self::$instance = new self(); |
||
30 | } |
||
31 | |||
32 | return self::$instance; |
||
33 | } |
||
34 | |||
35 | // this is necessary because you can't use "new" when you declare instance properties >:( |
||
36 | protected function __construct() { |
||
37 | $this->set_defaults(); |
||
38 | $this->init(); |
||
39 | } |
||
40 | |||
41 | private function init() { |
||
42 | |||
43 | foreach ( Jetpack_Sync_Modules::get_modules() as $module ) { |
||
44 | $module->init_before_send(); |
||
45 | } |
||
46 | |||
47 | /** |
||
48 | * Sync all pending actions with server |
||
49 | */ |
||
50 | add_action( 'jetpack_sync_actions', array( $this, 'do_sync' ) ); |
||
51 | } |
||
52 | |||
53 | public function do_sync() { |
||
54 | // don't sync if importing |
||
55 | if ( defined( 'WP_IMPORTING' ) && WP_IMPORTING ) { |
||
56 | $this->schedule_sync( '+1 minute' ); |
||
57 | |||
58 | return false; |
||
59 | } |
||
60 | |||
61 | // don't sync if we are throttled |
||
62 | $sync_wait = $this->get_sync_wait_time(); |
||
63 | $last_sync = $this->get_last_sync_time(); |
||
64 | |||
65 | if ( $last_sync && $sync_wait && $last_sync + $sync_wait > microtime( true ) ) { |
||
66 | return false; |
||
67 | } |
||
68 | |||
69 | $this->set_last_sync_time(); |
||
70 | |||
71 | do_action( 'jetpack_sync_before_send' ); |
||
72 | |||
73 | if ( $this->sync_queue->size() === 0 ) { |
||
74 | return false; |
||
75 | } |
||
76 | |||
77 | // now that we're sure we are about to sync, try to |
||
78 | // ignore user abort so we can avoid getting into a |
||
79 | // bad state |
||
80 | if ( function_exists( 'ignore_user_abort' ) ) { |
||
81 | ignore_user_abort( true ); |
||
82 | } |
||
83 | |||
84 | $buffer = $this->sync_queue->checkout_with_memory_limit( $this->dequeue_max_bytes, $this->upload_max_rows ); |
||
85 | |||
86 | if ( ! $buffer ) { |
||
87 | // buffer has no items |
||
88 | return false; |
||
89 | } |
||
90 | |||
91 | if ( is_wp_error( $buffer ) ) { |
||
92 | // another buffer is currently sending |
||
93 | return false; |
||
94 | } |
||
95 | |||
96 | $upload_size = 0; |
||
97 | $items_to_send = array(); |
||
98 | $items = $buffer->get_items(); |
||
99 | |||
100 | // we estimate the total encoded size as we go by encoding each item individually |
||
101 | // this is expensive, but the only way to really know :/ |
||
102 | foreach ( $items as $key => $item ) { |
||
103 | /** |
||
104 | * Modify the data within an action before it is serialized and sent to the server |
||
105 | * For example, during full sync this expands Post ID's into full Post objects, |
||
106 | * so that we don't have to serialize the whole object into the queue. |
||
107 | * |
||
108 | * @since 4.2.0 |
||
109 | * |
||
110 | * @param array The action parameters |
||
111 | */ |
||
112 | $item[1] = apply_filters( 'jetpack_sync_before_send_' . $item[0], $item[1], $item[2] ); |
||
113 | |||
114 | $encoded_item = $this->codec->encode( $item ); |
||
115 | |||
116 | $upload_size += strlen( $encoded_item ); |
||
117 | |||
118 | if ( $upload_size > $this->upload_max_bytes && count( $items_to_send ) > 0 ) { |
||
119 | break; |
||
120 | } |
||
121 | |||
122 | $items_to_send[ $key ] = $encoded_item; |
||
123 | } |
||
124 | |||
125 | /** |
||
126 | * Fires when data is ready to send to the server. |
||
127 | * Return false or WP_Error to abort the sync (e.g. if there's an error) |
||
128 | * The items will be automatically re-sent later |
||
129 | * |
||
130 | * @since 4.2.0 |
||
131 | * |
||
132 | * @param array $data The action buffer |
||
133 | */ |
||
134 | $processed_item_ids = apply_filters( 'jetpack_sync_send_data', $items_to_send, $this->codec->name(), microtime( true ) ); |
||
135 | |||
136 | if ( ! $processed_item_ids || is_wp_error( $processed_item_ids ) ) { |
||
137 | $processed_item_ids = $this->sync_queue->checkin( $buffer ); |
||
138 | |||
139 | if ( is_wp_error( $processed_item_ids ) ) { |
||
140 | error_log( 'Error checking in buffer: ' . $processed_item_ids->get_error_message() ); |
||
141 | $this->sync_queue->force_checkin(); |
||
142 | } |
||
143 | // try again in 1 minute |
||
144 | $this->schedule_sync( '+1 minute' ); |
||
145 | } else { |
||
146 | $processed_items = array_intersect_key( $items, array_flip( $processed_item_ids ) ); |
||
147 | |||
148 | /** |
||
149 | * Allows us to keep track of all the actions that have been sent. |
||
150 | * Allows us to calculate the progress of specific actions. |
||
151 | * |
||
152 | * @since 4.2.0 |
||
153 | * |
||
154 | * @param array $processed_actions The actions that we send successfully. |
||
155 | */ |
||
156 | do_action( 'jetpack_sync_processed_actions', $processed_items ); |
||
157 | |||
158 | $this->sync_queue->close( $buffer, $processed_item_ids ); |
||
159 | // check if there are any more events in the buffer |
||
160 | // if so, schedule a cron job to happen soon |
||
161 | if ( $this->sync_queue->has_any_items() ) { |
||
162 | $this->schedule_sync( '+1 minute' ); |
||
163 | } |
||
164 | } |
||
165 | } |
||
166 | |||
167 | private function schedule_sync( $when ) { |
||
168 | wp_schedule_single_event( strtotime( $when ), 'jetpack_sync_actions' ); |
||
169 | } |
||
170 | |||
171 | function get_sync_queue() { |
||
172 | return $this->sync_queue; |
||
173 | } |
||
174 | |||
175 | function get_codec() { |
||
176 | return $this->codec; |
||
177 | } |
||
178 | |||
179 | function send_checksum() { |
||
180 | require_once 'class.jetpack-sync-wp-replicastore.php'; |
||
181 | $store = new Jetpack_Sync_WP_Replicastore(); |
||
182 | do_action( 'jetpack_sync_checksum', $store->checksum_all() ); |
||
183 | } |
||
184 | |||
185 | function reset_sync_queue() { |
||
186 | Jetpack_Sync_Modules::get_module( 'full-sync' )->clear_status(); |
||
187 | $this->sync_queue->reset(); |
||
188 | } |
||
189 | |||
190 | function set_dequeue_max_bytes( $size ) { |
||
193 | |||
194 | // in bytes |
||
195 | function set_upload_max_bytes( $max_bytes ) { |
||
198 | |||
199 | // in rows |
||
200 | function set_upload_max_rows( $max_rows ) { |
||
203 | |||
204 | // in seconds |
||
205 | function set_sync_wait_time( $seconds ) { |
||
208 | |||
209 | function get_sync_wait_time() { |
||
212 | |||
213 | private function get_last_sync_time() { |
||
216 | |||
217 | private function set_last_sync_time() { |
||
220 | |||
221 | function set_defaults() { |
||
232 | |||
233 | function reset_data() { |
||
245 | |||
246 | function uninstall() { |
||
259 | } |
||
260 |