1 | <?php |
||
22 | class SelectStage extends AbstractTimeAwareStage |
||
23 | { |
||
24 | /** |
||
25 | * Selector |
||
26 | * |
||
27 | * @var AsyncSelector |
||
28 | */ |
||
29 | private $selector; |
||
30 | |||
31 | /** |
||
32 | * SelectStage constructor. |
||
33 | * |
||
34 | * @param RequestExecutorInterface $executor Request executor |
||
35 | * @param EventCaller $eventCaller Event caller |
||
36 | * @param ExecutionContext $executionContext Execution context |
||
37 | * @param AsyncSelector $selector Async selector |
||
38 | */ |
||
39 | 86 | public function __construct( |
|
48 | |||
49 | /** {@inheritdoc} */ |
||
50 | 72 | public function processStage(array $requestDescriptors) |
|
51 | { |
||
52 | 72 | $this->initLastIoOperationInfo($requestDescriptors); |
|
53 | 72 | $udpOperations = $this->findConnectionLessSockets($requestDescriptors); |
|
54 | 72 | if ($udpOperations) { |
|
55 | // do not perform actual select, since these requestDescriptors must be processed immediately |
||
56 | 2 | return $udpOperations; |
|
57 | } |
||
58 | |||
59 | /** @var RequestDescriptor[] $requestDescriptors */ |
||
60 | 70 | foreach ($requestDescriptors as $descriptor) { |
|
61 | 69 | foreach ($descriptor->getOperation()->getTypes() as $type) { |
|
62 | 69 | $this->selector->addSocketOperation($descriptor, $type); |
|
63 | 69 | } |
|
64 | 70 | } |
|
65 | |||
66 | try { |
||
67 | 70 | $timeout = $this->calculateSelectorTimeout($requestDescriptors); |
|
68 | 70 | $context = $this->selector->select($timeout['sec'], $timeout['microsec']); |
|
69 | 62 | return $this->getUniqueDescriptors( |
|
70 | 62 | array_merge( |
|
71 | 62 | $this->markDescriptors($context->getRead(), RequestDescriptor::RDS_READ), |
|
72 | 62 | $this->markDescriptors($context->getWrite(), RequestDescriptor::RDS_WRITE), |
|
73 | 62 | $this->markDescriptors($context->getOob(), RequestDescriptor::RDS_OOB) |
|
74 | 62 | ) |
|
75 | 62 | ); |
|
76 | 10 | } catch (TimeoutException $e) { |
|
77 | 9 | return []; |
|
78 | } |
||
79 | } |
||
80 | |||
81 | /** |
||
82 | * Return unique descriptors from given array |
||
83 | * |
||
84 | * @param RequestDescriptor[] $descriptors List of descriptors |
||
85 | * |
||
86 | * @return RequestDescriptor[] |
||
87 | */ |
||
88 | 62 | private function getUniqueDescriptors(array $descriptors) |
|
97 | |||
98 | /** |
||
99 | * Mark given list of descriptors |
||
100 | * |
||
101 | * @param RequestDescriptor[] $descriptors List of descriptors |
||
102 | * @param int $state State to set in descriptor |
||
103 | * |
||
104 | * @return RequestDescriptor[] The given list of descriptors |
||
105 | */ |
||
106 | 62 | private function markDescriptors(array $descriptors, $state) |
|
114 | |||
115 | /** |
||
116 | * Initialize information about last I/O operation |
||
117 | * |
||
118 | * @param RequestDescriptor[] $requestDescriptors List of requestDescriptors to apply |
||
119 | * |
||
120 | * @return void |
||
121 | */ |
||
122 | 72 | private function initLastIoOperationInfo(array $requestDescriptors) |
|
123 | { |
||
124 | 72 | foreach ($requestDescriptors as $descriptor) { |
|
125 | 71 | $this->setSocketOperationTime($descriptor, RequestExecutorInterface::META_LAST_IO_START_TIME); |
|
126 | 72 | } |
|
127 | 72 | } |
|
128 | |||
129 | /** |
||
130 | * Calculate selector timeout according to given array of active socket keys |
||
131 | * |
||
132 | * @param RequestDescriptor[] $activeDescriptors Active socket keys |
||
133 | * |
||
134 | * @return array { "sec": int, "microsec": int } |
||
135 | */ |
||
136 | 70 | private function calculateSelectorTimeout(array $activeDescriptors) |
|
159 | |||
160 | /** |
||
161 | * Return minimum timeout from two values |
||
162 | * |
||
163 | * @param double $newValue New value |
||
164 | * @param double $oldValue Old value |
||
165 | * |
||
166 | * @return double |
||
167 | */ |
||
168 | 69 | private function getMinTimeout($newValue, $oldValue) |
|
174 | |||
175 | /** |
||
176 | * Calculate timeout value for single socket operation |
||
177 | * |
||
178 | * @param RequestDescriptor $operation Operation object |
||
179 | * @param double $microTime Current time with microseconds |
||
180 | * |
||
181 | * @return double|null |
||
182 | */ |
||
183 | 69 | private function getSingleSocketTimeout(RequestDescriptor $operation, $microTime) |
|
198 | |||
199 | /** |
||
200 | * Find descriptors with UdpClientSocket and return them as result |
||
201 | * |
||
202 | * @param RequestDescriptor[] $requestDescriptors List of all requestDescriptors |
||
203 | * |
||
204 | * @return RequestDescriptor[] List of udp "clients" |
||
205 | */ |
||
206 | 72 | private function findConnectionLessSockets(array $requestDescriptors) |
|
218 | } |
||
219 |