Test Failed
Pull Request — master (#47)
by Michael
02:39
created

CurlMultiSender::reset()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 5
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 1
1
<?php
2
3
namespace Apple\ApnPush\Protocol\Http\Sender;
4
5
use Apple\ApnPush\Protocol\Http\Request;
6
use Apple\ApnPush\Protocol\Http\Response;
7
use RuntimeException;
8
9
/**
10
 * Class CurlMultiSender
11
 *
12
 * @package Apple\ApnPush\Protocol\Http\Sender
13
 */
14
class CurlMultiSender implements HttpSenderInterface
15
{
16
    /**
17
     * The curl version used
18
     *
19
     * @var
20
     */
21
    private $curlVersion;
22
23
    /**
24
     * Max number of simultaneous connections allowed
25
     *
26
     * @var int
27
     */
28
    private $maxConcurrentRequests = 10;
29
30
    /**
31
     * Global timeout all requests must be completed by this time
32
     *
33
     * @var int
34
     */
35
    private $timeout = 5000;
36
37
    /**
38
     * The request queue
39
     *
40
     * @var array
41
     */
42
    private $requests = [];
43
44
    /**
45
     * CurlMultiSender constructor.
46
     *
47
     * @param int $maxRequests
48
     * @param int $timeout
49
     */
50
    public function __construct($maxRequests = 10, $timeout = 5000)
51
    {
52
        $this->maxRequests($maxRequests);
53
        $this->timeout($timeout);
54
55
        $this->curlVersion = curl_version()['version'];
56
    }
57
58
    /**
59
     * Set max concurrent requests.
60
     *
61
     * @param int $maxRequests
62
     *
63
     * @return HttpSenderInterface
64
     */
65
    public function maxRequests(int $maxRequests): HttpSenderInterface
66
    {
67
        $this->maxConcurrentRequests = $maxRequests;
68
69
        return $this;
70
    }
71
72
    /**
73
     * Set the global requests timeout.
74
     *
75
     * @param int $timeout
76
     *
77
     * @return HttpSenderInterface
78
     */
79
    public function timeout(int $timeout): HttpSenderInterface
80
    {
81
        $this->timeout = $timeout;
82
83
        return $this;
84
    }
85
86
    /**
87
     * Add a request to the request queue
88
     *
89
     * @param Request  $request
90
     * @param callable $callback
91
     *
92
     * @return void
93
     */
94
    public function addRequest(Request $request, callable $callback): void
95
    {
96
        $inlineHeaders = [];
97
        $options = [];
98
99
        foreach ($request->getHeaders() as $name => $value) {
100
            $inlineHeaders[] = sprintf('%s: %s', $name, $value);
101
        }
102
103
        if ($request->getCertificate()) {
104
            $options[CURLOPT_SSLCERT] = $request->getCertificate();
105
            $options[CURLOPT_SSLCERTPASSWD] = $request->getCertificatePassPhrase();
106
        }
107
108
        $this->requests[] = [
109
            'url'       => $request->getUrl(),
110
            'post_data' => $request->getContent(),
111
            'callback'  => $callback,
112
            'options'   => $options,
113
            'headers'   => $inlineHeaders,
114
        ];
115
    }
116
117
    /**
118
     * Reset request queue
119
     *
120
     * @param $multiCurlHandle
121
     */
122
    private function reset($multiCurlHandle)
123
    {
124
        $this->requests = [];
125
        curl_multi_close($multiCurlHandle);
126
    }
127
128
    /**
129
     * Execute the request queue
130
     *
131
     */
132
    public function send(): void
133
    {
134
        $active = null;
135
        $requestsMap = [];
136
        $multiHandle = curl_multi_init();
137
        $requestsRunning = 0;
138
139
        $requestsToRun = $this->requestsToRun();
140
        for ($i = 0; $i < $requestsToRun; $i++) {
141
            $this->initRequest($i, $multiHandle, $requestsMap);
142
            $requestsRunning++;
143
        }
144
145
        do {
146
            do {
147
                $handlerStatus = curl_multi_exec($multiHandle, $active);
148
            } while ($handlerStatus === CURLM_CALL_MULTI_PERFORM);
149
150
            if ($handlerStatus !== CURLM_OK) {
151
                break;
152
            }
153
154
            while ($completed = curl_multi_info_read($multiHandle)) {
155
                $this->processSingleRequest($completed, $multiHandle, $requestsMap);
156
                $requestsRunning--;
157
158
                while ($this->shouldStartNewRequest($requestsRunning, $i)) {
159
                    $this->initRequest($i, $multiHandle, $requestsMap);
160
                    $requestsRunning++;
161
                    $i++;
162
                }
163
            }
164
165
            $this->saveCycles();
166
        } while ($active || count($requestsMap));
167
168
        $this->reset($multiHandle);
169
    }
170
171
    /**
172
     * Build individual cURL options for a request
173
     *
174
     * @param array $request
175
     *
176
     * @return array|mixed
177
     */
178
    private function buildOptions(array $request)
179
    {
180
        $url = $request['url'];
181
        $postData = $request['post_data'];
182
183
        $options = $request['options'];
184
185
        $options[CURLOPT_RETURNTRANSFER] = true;
186
        $options[CURLOPT_NOSIGNAL] = 1;
187
        $options[CURLOPT_HTTPHEADER] = $request['headers'];
188
        $options[CURLOPT_URL] = $url;
189
        $options[CURLOPT_POST] = 1;
190
        $options[CURLOPT_POSTFIELDS] = $postData;
191
        $options[CURLOPT_HTTP_VERSION] = 3;
192
193
        if (version_compare($this->curlVersion, '7.16.2') >= 0) {
194
            $options[CURLOPT_CONNECTTIMEOUT_MS] = $this->timeout;
195
            $options[CURLOPT_TIMEOUT_MS] = $this->timeout;
196
        } else {
197
            $options[CURLOPT_CONNECTTIMEOUT] = round($this->timeout / 1000);
198
            $options[CURLOPT_TIMEOUT] = round($this->timeout / 1000);
199
        }
200
201
        return $options;
202
    }
203
204
205
    /**
206
     * Initialize Curl request
207
     *
208
     * @param $requestNumber
209
     * @param $multiHandler
210
     * @param $requestsMap
211
     */
212
    private function initRequest($requestNumber, $multiHandler, &$requestsMap)
213
    {
214
        $curlHandler = curl_init();
215
216
        $request =& $this->requests[$requestNumber];
217
218
        $options = $this->buildOptions($request);
219
        $request['options_set'] = $options; //merged options
220
221
        if (false === curl_setopt_array($curlHandler, $options)) {
222
            throw new RuntimeException('Invalid options passed to curl');
223
        }
224
225
        $this->addTimer($request);
226
227
        curl_multi_add_handle($multiHandler, $curlHandler);
228
229
        //add curl handle of a new request to the request map
230
        $curlHandleHash = (int)$curlHandler;
231
        $requestsMap[$curlHandleHash] = $requestNumber;
232
    }
233
234
235
    /**
236
     * Process the response from a request.
237
     *
238
     * @param       $completed
239
     * @param       $multiHandle
240
     * @param array $requestsMap
241
     *
242
     * @return void
243
     */
244
    private function processSingleRequest($completed, $multiHandle, array &$requestsMap)
245
    {
246
        $curlHandle = $completed['handle'];
247
        $curlHandleHash = (int)$curlHandle;
248
        $request =& $this->requests[$requestsMap[$curlHandleHash]];
249
250
        $requestInfo = curl_getinfo($curlHandle);
251
        $requestInfo['time'] = $time = $this->stopTimer($request);
252
253
        if (true === $this->serverRespondedWithError($curlHandle, $requestInfo)) {
254
            $response = '';
255
        } else {
256
            $response = curl_multi_getcontent($curlHandle);
257
        }
258
259
        // remove completed request and its curl handle
260
        unset($requestsMap[$curlHandleHash]);
261
        curl_multi_remove_handle($multiHandle, $curlHandle);
262
263
        // get request info
264
        $callback = $request['callback'];
265
266
        $callback(
267
            new Response(
268
                (int) $requestInfo['http_code'],
269
                (string) $response,
270
                (float) $time
271
            )
272
        );
273
274
        unset($requestInfo);
275
        unset($request);
276
        unset($time);
277
    }
278
279
    /**
280
     * Add a timer on the request.
281
     *
282
     * @param array $request
283
     */
284
    private function addTimer(array &$request)
285
    {
286
        $request['timer'] = microtime(true); //start time
287
        $request['time'] = 0; //default if not overridden by time later
288
    }
289
290
    /**
291
     * Stop request timer.
292
     *
293
     * @param array $request
294
     *
295
     * @return float
296
     */
297
    private function stopTimer(array &$request)
298
    {
299
        $elapsed = $request['timer'] - microtime(true);
300
        $request['time'] = $elapsed;
301
        unset($request['timer']);
302
303
        return $elapsed;
304
    }
305
306
    /**
307
     * Determine how many requests should run.
308
     *
309
     * @return integer
310
     */
311
    private function requestsToRun(): int
312
    {
313
        return (int) min($this->maxConcurrentRequests, count($this->requests));
314
    }
315
316
    /**
317
     * Determine if the request failed
318
     *
319
     * @param $curlHandle
320
     * @param $requestInfo
321
     *
322
     * @return bool
323
     */
324
    private function serverRespondedWithError($curlHandle, array $requestInfo): bool
325
    {
326
        return curl_errno($curlHandle) !== 0 || (int) $requestInfo['http_code'] !== 200;
327
    }
328
329
    /**
330
     * Determine if the running requests is
331
     * still under the concurrent requests limit.
332
     *
333
     * @param $requestsRunning
334
     *
335
     * @return bool
336
     */
337
    private function isUnderRequestLimit(int $requestsRunning): bool
338
    {
339
        return $requestsRunning < $this->maxConcurrentRequests;
340
    }
341
342
    /**
343
     * Determine if we have any request left to run
344
     *
345
     * @param $iterator
346
     *
347
     * @return bool
348
     */
349
    private function hasRequestsLeft(int $iterator): bool
350
    {
351
        return $iterator < count($this->requests) && isset($this->requests[$iterator]);
352
    }
353
354
    /**
355
     * Determine if we should start a new request
356
     *
357
     * @param $requestsRunning
358
     * @param $iterator
359
     *
360
     * @return bool
361
     */
362
    private function shouldStartNewRequest(int $requestsRunning, int $iterator): bool
363
    {
364
        return $this->isUnderRequestLimit($requestsRunning) && $this->hasRequestsLeft($iterator);
365
    }
366
367
    /**
368
     * Save cpu cycles
369
     * prevent continuous checking
370
     */
371
    private function saveCycles()
372
    {
373
        return usleep(15);
374
    }
375
}
376