SelectStage::processStage()   B
last analyzed

Complexity

Conditions 5
Paths 13

Size

Total Lines 30
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 5

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 30
ccs 21
cts 21
cp 1
rs 8.439
cc 5
eloc 18
nc 13
nop 1
crap 5
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor\Pipeline;
11
12
use AsyncSockets\Exception\TimeoutException;
13
use AsyncSockets\RequestExecutor\ExecutionContext;
14
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
15
use AsyncSockets\RequestExecutor\RequestExecutorInterface;
16
use AsyncSockets\RequestExecutor\Specification\ConnectionLessSocketSpecification;
17
use AsyncSockets\Socket\AsyncSelector;
18
19
/**
20
 * Class SelectStageAbstract
21
 */
22
class SelectStage extends AbstractTimeAwareStage
23
{
24
    /**
25
     * Selector
26
     *
27
     * @var AsyncSelector
28
     */
29
    private $selector;
30
31
    /**
32
     * SelectStage constructor.
33
     *
34
     * @param RequestExecutorInterface $executor    Request executor
35
     * @param EventCaller              $eventCaller Event caller
36
     * @param ExecutionContext         $executionContext Execution context
37
     * @param AsyncSelector            $selector    Async selector
38
     */
39 86
    public function __construct(
40
        RequestExecutorInterface $executor,
41
        EventCaller $eventCaller,
42
        ExecutionContext $executionContext,
43
        AsyncSelector $selector
44
    ) {
45 86
        parent::__construct($executor, $eventCaller, $executionContext);
46 86
        $this->selector = $selector;
47 86
    }
48
49
    /** {@inheritdoc} */
50 72
    public function processStage(array $requestDescriptors)
51
    {
52 72
        $this->initLastIoOperationInfo($requestDescriptors);
53 72
        $udpOperations = $this->findConnectionLessSockets($requestDescriptors);
54 72
        if ($udpOperations) {
55
            // do not perform actual select, since these requestDescriptors must be processed immediately
56 2
            return $udpOperations;
57
        }
58
59
        /** @var RequestDescriptor[] $requestDescriptors */
60 70
        foreach ($requestDescriptors as $descriptor) {
61 69
            foreach ($descriptor->getOperation()->getTypes() as $type) {
62 69
                $this->selector->addSocketOperation($descriptor, $type);
63 69
            }
64 70
        }
65
66
        try {
67 70
            $timeout = $this->calculateSelectorTimeout($requestDescriptors);
68 70
            $context = $this->selector->select($timeout['sec'], $timeout['microsec']);
69 62
            return $this->getUniqueDescriptors(
70 62
                array_merge(
71 62
                    $this->markDescriptors($context->getRead(), RequestDescriptor::RDS_READ),
72 62
                    $this->markDescriptors($context->getWrite(), RequestDescriptor::RDS_WRITE),
73 62
                    $this->markDescriptors($context->getOob(), RequestDescriptor::RDS_OOB)
74 62
                )
75 62
            );
76 10
        } catch (TimeoutException $e) {
77 9
            return [];
78
        }
79
    }
80
81
    /**
82
     * Return unique descriptors from given array
83
     *
84
     * @param RequestDescriptor[] $descriptors List of descriptors
85
     *
86
     * @return RequestDescriptor[]
87
     */
88 62
    private function getUniqueDescriptors(array $descriptors)
89
    {
90 62
        $result = [];
91 62
        foreach ($descriptors as $descriptor) {
92 47
            $result[spl_object_hash($descriptor)] = $descriptor;
93 62
        }
94
95 62
        return array_values($result);
96
    }
97
98
    /**
99
     * Mark given list of descriptors
100
     *
101
     * @param RequestDescriptor[] $descriptors List of descriptors
102
     * @param int                 $state State to set in descriptor
103
     *
104
     * @return RequestDescriptor[] The given list of descriptors
105
     */
106 62
    private function markDescriptors(array $descriptors, $state)
107
    {
108 62
        foreach ($descriptors as $descriptor) {
109 47
            $descriptor->setState($state);
110 62
        }
111
112 62
        return $descriptors;
113
    }
114
115
    /**
116
     * Initialize information about last I/O operation
117
     *
118
     * @param RequestDescriptor[] $requestDescriptors List of requestDescriptors to apply
119
     *
120
     * @return void
121
     */
122 72
    private function initLastIoOperationInfo(array $requestDescriptors)
123
    {
124 72
        foreach ($requestDescriptors as $descriptor) {
125 71
            $this->setSocketOperationTime($descriptor, RequestExecutorInterface::META_LAST_IO_START_TIME);
126 72
        }
127 72
    }
128
129
    /**
130
     * Calculate selector timeout according to given array of active socket keys
131
     *
132
     * @param RequestDescriptor[] $activeDescriptors Active socket keys
133
     *
134
     * @return array { "sec": int, "microsec": int }
135
     */
136 70
    private function calculateSelectorTimeout(array $activeDescriptors)
137
    {
138 70
        $microtime  = microtime(true);
139 70
        $minTimeout = null;
140
        $result     = [
141 70
            'sec'      => null,
142 70
            'microsec' => null,
143 70
        ];
144
145 70
        foreach ($activeDescriptors as $descriptor) {
146 69
            $timeout    = $this->getSingleSocketTimeout($descriptor, $microtime);
147 69
            $minTimeout = $this->getMinTimeout($timeout, $minTimeout);
148 70
        }
149
150 70
        if ($minTimeout !== null) {
151
            $result = [
152 59
                'sec'      => (int) floor($minTimeout),
153 59
                'microsec' => round((double) $minTimeout - floor($minTimeout), 6) * 1000000,
154 59
            ];
155 59
        }
156
157 70
        return $result;
158
    }
159
160
    /**
161
     * Return minimum timeout from two values
162
     *
163
     * @param double $newValue New value
164
     * @param double $oldValue Old value
165
     *
166
     * @return double
167
     */
168 69
    private function getMinTimeout($newValue, $oldValue)
169
    {
170 69
        return (($newValue > 0 && $newValue < $oldValue) || $oldValue === null) ?
171 69
            $newValue :
172 69
            $oldValue;
173
    }
174
175
    /**
176
     * Calculate timeout value for single socket operation
177
     *
178
     * @param RequestDescriptor $operation Operation object
179
     * @param double            $microTime Current time with microseconds
180
     *
181
     * @return double|null
182
     */
183 69
    private function getSingleSocketTimeout(RequestDescriptor $operation, $microTime)
184
    {
185 69
        $desiredTimeout    = $this->timeoutSetting($operation);
186 69
        $lastOperationTime = $this->timeSinceLastIo($operation);
187
188 69
        if ($desiredTimeout === RequestExecutorInterface::WAIT_FOREVER) {
189 11
            return null;
190
        }
191
192 59
        $result = $lastOperationTime === null ?
193 59
            $desiredTimeout :
194 59
            $desiredTimeout - ($microTime - $lastOperationTime);
195
196 59
        return $result >= 0 ? $result : 0;
197
    }
198
199
    /**
200
     * Find descriptors with UdpClientSocket and return them as result
201
     *
202
     * @param RequestDescriptor[] $requestDescriptors List of all requestDescriptors
203
     *
204
     * @return RequestDescriptor[] List of udp "clients"
205
     */
206 72
    private function findConnectionLessSockets(array $requestDescriptors)
207
    {
208 72
        $result        = [];
209 72
        $specification = new ConnectionLessSocketSpecification();
210 72
        foreach ($requestDescriptors as $operation) {
211 71
            if ($specification->isSatisfiedBy($operation)) {
212 2
                $result[] = $operation;
213 2
            }
214 72
        }
215
216 72
        return $result;
217
    }
218
}
219