ManualScheduler::interruptConsume()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 5
cts 5
cp 1
rs 10
c 0
b 0
f 0
cc 2
nc 2
nop 0
crap 2
1
<?php
2
3
namespace mpyw\Co\Internal;
4
use mpyw\Co\CURLException;
5
use React\Promise\Deferred;
6
use React\Promise\PromiseInterface;
7
8
class ManualScheduler extends AbstractScheduler
9
{
10
    /**
11
     * cURL handles those have not been dispatched.
12
     * @var array
13
     */
14
    private $queue = [];
15
16
    /**
17
     * Constructor.
18
     * Initialize cURL multi handle.
19
     * @param CoOption $options
20
     * @param resource $mh      curl_multi
21
     */
22 37
    public function __construct(CoOption $options, $mh)
23 37
    {
24 37
        $this->mh = $mh;
25 37
        $this->options = $options;
26 37
    }
27
28
    /**
29
     * Call curl_multi_add_handle() or push into queue.
30
     * @param resource $ch
31
     * @return PromiseInterface
32
     */
33 14
    public function add($ch)
34 14
    {
35 14
        $deferred = new Deferred;
36 14
        $this->options['concurrency'] > 0
37 14
        && count($this->added) >= $this->options['concurrency']
38 1
            ? $this->addReserved($ch, $deferred)
39 14
            : $this->addImmediate($ch, $deferred);
40 14
        return $deferred->promise();
41
    }
42
43
    /**
44
     * Are there no cURL handles?
45
     * @return bool
46
     */
47 31
    public function isEmpty()
48 31
    {
49 31
        return !$this->added && !$this->queue;
50
    }
51
52
    /**
53
     * Call curl_multi_add_handle().
54
     * @param resource $ch
55
     * @param Deferred $deferred
56
     */
57 14
    private function addImmediate($ch, Deferred $deferred = null)
58 14
    {
59 14
        $errno = curl_multi_add_handle($this->mh, $ch);
60 14 View Code Duplication
        if ($errno !== CURLM_OK) {
61
            // @codeCoverageIgnoreStart
62
            $msg = curl_multi_strerror($errno) . ": $ch";
63
            $deferred && $deferred->reject(new \RuntimeException($msg));
64
            return;
65
            // @codeCoverageIgnoreEnd
66
        }
67 14
        $this->added[(string)$ch] = $ch;
68 14
        $deferred && $this->deferreds[(string)$ch] = $deferred;
69 14
    }
70
71
    /**
72
     * Push into queue.
73
     * @param resource $ch
74
     * @param Deferred $deferred
75
     */
76 1
    private function addReserved($ch, Deferred $deferred = null)
77 1
    {
78 1
        $this->queue[(string)$ch] = $ch;
79 1
        $deferred && $this->deferreds[(string)$ch] = $deferred;
80 1
    }
81
82
    /**
83
     * Add cURL handles from waiting queue.
84
     */
85 14
    protected function interruptConsume()
86 14
    {
87 14
        if ($this->queue) {
88 1
            $this->addImmediate(array_shift($this->queue));
89
        }
90 14
    }
91
}
92