Test Failed
Pull Request — master (#47)
by Michael
02:07
created

CurlMultiSender::send()   D

Complexity

Conditions 8
Paths 0

Size

Total Lines 38
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 38
rs 4
c 0
b 0
f 0
cc 8
eloc 25
nc 0
nop 0
1
<?php
2
3
namespace Apple\ApnPush\Protocol\Http\Sender;
4
5
use Apple\ApnPush\Protocol\Http\Request;
6
use Apple\ApnPush\Protocol\Http\Response;
7
use RuntimeException;
8
9
/**
10
 * Class CurlMultiSender
11
 *
12
 * @package Apple\ApnPush\Protocol\Http\Sender
13
 */
14
class CurlMultiSender implements HttpSenderInterface
15
{
16
    /**
17
     * The curl version used
18
     *
19
     * @var
20
     */
21
    private $curlVersion;
22
23
    /**
24
     * Max number of simultaneous connections allowed
25
     *
26
     * @var int
27
     */
28
    private $maxConcurrentRequests = 10;
29
30
    /**
31
     * Global timeout all requests must be completed by this time
32
     *
33
     * @var int
34
     */
35
    private $timeout = 5000;
36
37
    /**
38
     * The request queue
39
     *
40
     * @var array
41
     */
42
    private $requests = [];
43
44
    /**
45
     * CurlMultiSender constructor.
46
     *
47
     * @param int $maxRequests
48
     * @param int $timeout
49
     */
50
    public function __construct($maxRequests = 10, $timeout = 5000)
51
    {
52
        $this->maxRequests($maxRequests);
53
        $this->timeout($timeout);
54
55
        $this->curlVersion = curl_version()['version'];
56
    }
57
58
    /**
59
     * Set max concurrent requests.
60
     *
61
     * @param int $maxRequests
62
     *
63
     * @return HttpSenderInterface
64
     */
65
    public function maxRequests(int $maxRequests): HttpSenderInterface
66
    {
67
        $this->maxConcurrentRequests = $maxRequests;
68
69
        return $this;
70
    }
71
72
    /**
73
     * Set the global requests timeout.
74
     *
75
     * @param int $timeout
76
     *
77
     * @return HttpSenderInterface
78
     */
79
    public function timeout(int $timeout): HttpSenderInterface
80
    {
81
        $this->timeout = $timeout;
82
83
        return $this;
84
    }
85
86
    /**
87
     * Add a request to the request queue
88
     *
89
     * @param Request  $request
90
     * @param callable $callback
91
     *
92
     * @return void
93
     */
94
    public function addRequest(Request $request, callable $callback): void
95
    {
96
        $inlineHeaders = [];
97
        $options = [];
98
99
        foreach ($request->getHeaders() as $name => $value) {
100
            $inlineHeaders[] = sprintf('%s: %s', $name, $value);
101
        }
102
103
        if ($request->getCertificate()) {
104
            $options[CURLOPT_SSLCERT] = $request->getCertificate();
105
            $options[CURLOPT_SSLCERTPASSWD] = $request->getCertificatePassPhrase();
106
        }
107
108
        $this->requests[] = [
109
            'url'       => $request->getUrl(),
110
            'post_data' => $request->getContent(),
111
            'callback'  => $callback,
112
            'options'   => $options,
113
            'headers'   => $inlineHeaders,
114
        ];
115
    }
116
117
    /**
118
     * Reset request queue
119
     *
120
     * @param $multiCurlHandle
121
     */
122
    private function reset($multiCurlHandle)
123
    {
124
        $this->requests = [];
125
        curl_multi_close($multiCurlHandle);
126
    }
127
128
    /**
129
     * Execute the request queue
130
     *
131
     */
132
    public function send(): void
133
    {
134
        $active = null;
135
        $requestsMap = [];
136
        $multiHandle = curl_multi_init();
137
        $requestsRunning = 0;
138
139
        $requestsToRun = $this->requestsToRun();
140
        for ($i = 0; $i < $requestsToRun; $i++) {
141
            $this->initRequest($i, $multiHandle, $requestsMap);
142
            $requestsRunning++;
143
        }
144
145
        do {
146
            do {
147
                $handlerStatus = curl_multi_exec($multiHandle, $active);
148
            } while ($handlerStatus === CURLM_CALL_MULTI_PERFORM);
149
150
            if ($handlerStatus !== CURLM_OK) {
151
                break;
152
            }
153
154
            while ($completed = curl_multi_info_read($multiHandle)) {
155
                $this->processSingleRequest($completed, $multiHandle, $requestsMap);
156
                $requestsRunning--;
157
158
                while ($this->shouldStartNewRequest($requestsRunning, $i)) {
159
                    $this->initRequest($i, $multiHandle, $requestsMap);
160
                    $requestsRunning++;
161
                    $i++;
162
                }
163
            }
164
165
            $this->saveCycles();
166
        } while ($active || count($requestsMap));
167
168
        $this->reset($multiHandle);
169
    }
170
171
    /**
172
     * Build individual cURL options for a request
173
     *
174
     * @param array $request
175
     *
176
     * @return array
177
     */
178
    private function buildOptions(array $request): array
179
    {
180
        $url = $request['url'];
181
        $postData = $request['post_data'];
182
183
        $options = $request['options'];
184
185
        $options[CURLOPT_RETURNTRANSFER] = true;
186
        $options[CURLOPT_NOSIGNAL] = 1;
187
        $options[CURLOPT_HTTPHEADER] = $request['headers'];
188
        $options[CURLOPT_URL] = $url;
189
        $options[CURLOPT_POST] = 1;
190
        $options[CURLOPT_POSTFIELDS] = $postData;
191
        $options[CURLOPT_HTTP_VERSION] = 3;
192
193
        if (version_compare($this->curlVersion, '7.16.2') >= 0) {
194
            $options[CURLOPT_CONNECTTIMEOUT_MS] = $this->timeout;
195
            $options[CURLOPT_TIMEOUT_MS] = $this->timeout;
196
        } else {
197
            $options[CURLOPT_CONNECTTIMEOUT] = round($this->timeout / 1000);
198
            $options[CURLOPT_TIMEOUT] = round($this->timeout / 1000);
199
        }
200
201
        return $options;
202
    }
203
204
205
    /**
206
     * Initialize Curl request
207
     *
208
     * @param $requestNumber
209
     * @param $multiHandler
210
     * @param $requestsMap
211
     *
212
     * @return void
213
     */
214
    private function initRequest($requestNumber, $multiHandler, &$requestsMap): void
215
    {
216
        $curlHandler = curl_init();
217
218
        $request =& $this->requests[$requestNumber];
219
220
        $options = $this->buildOptions($request);
221
        $request['options_set'] = $options; //merged options
222
223
        if (false === curl_setopt_array($curlHandler, $options)) {
224
            throw new RuntimeException('Invalid options passed to curl');
225
        }
226
227
        $this->addTimer($request);
228
229
        curl_multi_add_handle($multiHandler, $curlHandler);
230
231
        //add curl handle of a new request to the request map
232
        $curlHandleHash = (int)$curlHandler;
233
        $requestsMap[$curlHandleHash] = $requestNumber;
234
    }
235
236
237
    /**
238
     * Process the response from a request.
239
     *
240
     * @param       $completed
241
     * @param       $multiHandle
242
     * @param array $requestsMap
243
     *
244
     * @return void
245
     */
246
    private function processSingleRequest($completed, $multiHandle, array &$requestsMap): void
247
    {
248
        $curlHandle = $completed['handle'];
249
        $curlHandleHash = (int)$curlHandle;
250
        $request =& $this->requests[$requestsMap[$curlHandleHash]];
251
252
        $requestInfo = curl_getinfo($curlHandle);
253
        $requestInfo['time'] = $time = $this->stopTimer($request);
254
255
        if (true === $this->serverRespondedWithError($curlHandle, $requestInfo)) {
256
            $response = '';
257
        } else {
258
            $response = curl_multi_getcontent($curlHandle);
259
        }
260
261
        // remove completed request and its curl handle
262
        unset($requestsMap[$curlHandleHash]);
263
        curl_multi_remove_handle($multiHandle, $curlHandle);
264
265
        // get request info
266
        $callback = $request['callback'];
267
268
        $callback(
269
            new Response(
270
                (int) $requestInfo['http_code'],
271
                (string) $response,
272
                (float) $time
273
            )
274
        );
275
276
        unset($requestInfo);
277
        unset($request);
278
        unset($time);
279
    }
280
281
    /**
282
     * Add a timer on the request.
283
     *
284
     * @param array $request
285
     *
286
     * @return void
287
     */
288
    private function addTimer(array &$request): void
289
    {
290
        $request['timer'] = microtime(true); //start time
291
        $request['time'] = 0; //default if not overridden by time later
292
    }
293
294
    /**
295
     * Stop request timer.
296
     *
297
     * @param array $request
298
     *
299
     * @return float
300
     */
301
    private function stopTimer(array &$request): float
302
    {
303
        $elapsed = $request['timer'] - microtime(true);
304
        $request['time'] = $elapsed;
305
        unset($request['timer']);
306
307
        return $elapsed;
308
    }
309
310
    /**
311
     * Determine how many requests should run.
312
     *
313
     * @return integer
314
     */
315
    private function requestsToRun(): int
316
    {
317
        return (int) min($this->maxConcurrentRequests, count($this->requests));
318
    }
319
320
    /**
321
     * Determine if the request failed
322
     *
323
     * @param $curlHandle
324
     * @param $requestInfo
325
     *
326
     * @return bool
327
     */
328
    private function serverRespondedWithError($curlHandle, array $requestInfo): bool
329
    {
330
        return curl_errno($curlHandle) !== 0 || (int) $requestInfo['http_code'] !== 200;
331
    }
332
333
    /**
334
     * Determine if the running requests is
335
     * still under the concurrent requests limit.
336
     *
337
     * @param $requestsRunning
338
     *
339
     * @return bool
340
     */
341
    private function isUnderRequestLimit(int $requestsRunning): bool
342
    {
343
        return $requestsRunning < $this->maxConcurrentRequests;
344
    }
345
346
    /**
347
     * Determine if we have any request left to run
348
     *
349
     * @param $iterator
350
     *
351
     * @return bool
352
     */
353
    private function hasRequestsLeft(int $iterator): bool
354
    {
355
        return $iterator < count($this->requests) && isset($this->requests[$iterator]);
356
    }
357
358
    /**
359
     * Determine if we should start a new request
360
     *
361
     * @param $requestsRunning
362
     * @param $iterator
363
     *
364
     * @return bool
365
     */
366
    private function shouldStartNewRequest(int $requestsRunning, int $iterator): bool
367
    {
368
        return $this->isUnderRequestLimit($requestsRunning) && $this->hasRequestsLeft($iterator);
369
    }
370
371
    /**
372
     * Save cpu cycles
373
     * prevent continuous checking
374
     *
375
     * @return void
376
     */
377
    private function saveCycles(): void
378
    {
379
        usleep(15);
380
    }
381
}
382