1 | <?php |
||
21 | class SocketBag implements SocketBagInterface |
||
22 | { |
||
23 | /** |
||
24 | * RequestExecutorInterface |
||
25 | * |
||
26 | * @var RequestExecutorInterface |
||
27 | */ |
||
28 | private $executor; |
||
29 | |||
30 | /** |
||
31 | * Target metadata items |
||
32 | * |
||
33 | * @var RequestDescriptor[] |
||
34 | */ |
||
35 | private $items; |
||
36 | |||
37 | /** |
||
38 | * Default connection timeout |
||
39 | * |
||
40 | * @var double |
||
41 | */ |
||
42 | private $connectTimeout; |
||
43 | |||
44 | /** |
||
45 | * Default I/O timeout |
||
46 | * |
||
47 | * @var double |
||
48 | */ |
||
49 | private $ioTimeout; |
||
50 | |||
51 | /** |
||
52 | * SocketBag constructor. |
||
53 | * |
||
54 | * @param RequestExecutorInterface $executor Owner RequestExecutor |
||
55 | * @param double $connectTimeout Default connection timeout |
||
56 | * @param double $ioTimeout Default I/O timeout |
||
57 | */ |
||
58 | 102 | public function __construct(RequestExecutorInterface $executor, $connectTimeout, $ioTimeout) |
|
65 | |||
66 | /** {@inheritdoc} */ |
||
67 | 1 | public function count() |
|
71 | |||
72 | |||
73 | /** {@inheritdoc} */ |
||
74 | 92 | public function addSocket( |
|
75 | SocketInterface $socket, |
||
76 | OperationInterface $operation, |
||
77 | array $metadata = null, |
||
78 | EventHandlerInterface $eventHandlers = null |
||
79 | ) { |
||
80 | 92 | $hash = $this->getOperationStorageKey($socket); |
|
81 | 92 | if (isset($this->items[$hash])) { |
|
82 | 1 | throw new \LogicException('Can not add socket twice.'); |
|
83 | } |
||
84 | |||
85 | 92 | $meta = array_merge( |
|
86 | [ |
||
87 | 92 | RequestExecutorInterface::META_ADDRESS => null, |
|
88 | RequestExecutorInterface::META_USER_CONTEXT => null, |
||
89 | RequestExecutorInterface::META_SOCKET_STREAM_CONTEXT => null, |
||
90 | RequestExecutorInterface::META_MIN_RECEIVE_SPEED => null, |
||
91 | RequestExecutorInterface::META_MIN_RECEIVE_SPEED_DURATION => null, |
||
92 | RequestExecutorInterface::META_MIN_SEND_SPEED => null, |
||
93 | RequestExecutorInterface::META_MIN_SEND_SPEED_DURATION => null, |
||
94 | 92 | RequestExecutorInterface::META_CONNECTION_TIMEOUT => $this->connectTimeout, |
|
95 | 92 | RequestExecutorInterface::META_IO_TIMEOUT => $this->ioTimeout, |
|
96 | ], |
||
97 | 92 | $metadata ?: [], |
|
98 | [ |
||
99 | 92 | RequestExecutorInterface::META_CONNECTION_START_TIME => null, |
|
100 | RequestExecutorInterface::META_CONNECTION_FINISH_TIME => null, |
||
101 | RequestExecutorInterface::META_LAST_IO_START_TIME => null, |
||
102 | RequestExecutorInterface::META_BYTES_SENT => 0, |
||
103 | RequestExecutorInterface::META_BYTES_RECEIVED => 0, |
||
104 | RequestExecutorInterface::META_REQUEST_COMPLETE => false, |
||
105 | RequestExecutorInterface::META_RECEIVE_SPEED => 0, |
||
106 | RequestExecutorInterface::META_SEND_SPEED => 0, |
||
107 | ] |
||
108 | ); |
||
109 | |||
110 | 92 | $this->items[$hash] = new RequestDescriptor($socket, $operation, $meta, $eventHandlers); |
|
111 | 92 | } |
|
112 | |||
113 | /** {@inheritdoc} */ |
||
114 | 1 | public function getSocketOperation(SocketInterface $socket) |
|
115 | { |
||
116 | 1 | return $this->requireDescriptor($socket)->getOperation(); |
|
117 | } |
||
118 | |||
119 | /** {@inheritdoc} */ |
||
120 | 1 | public function setSocketOperation(SocketInterface $socket, OperationInterface $operation) |
|
121 | { |
||
122 | 1 | $this->requireDescriptor($socket)->setOperation($operation); |
|
123 | 1 | } |
|
124 | |||
125 | /** {@inheritdoc} */ |
||
126 | 3 | public function hasSocket(SocketInterface $socket) |
|
127 | { |
||
128 | 3 | $hash = $this->getOperationStorageKey($socket); |
|
129 | 3 | return isset($this->items[$hash]); |
|
130 | } |
||
131 | |||
132 | /** {@inheritdoc} */ |
||
133 | 3 | public function removeSocket(SocketInterface $socket) |
|
134 | { |
||
135 | 3 | $key = $this->getOperationStorageKey($socket); |
|
136 | 3 | if (!isset($this->items[$key])) { |
|
137 | 1 | return; |
|
138 | } |
||
139 | |||
140 | 2 | $meta = $this->items[$key]->getMetadata(); |
|
141 | 2 | if (!$meta[RequestExecutorInterface::META_REQUEST_COMPLETE] && $this->executor->isExecuting()) { |
|
142 | 1 | throw new \LogicException('Can not remove unprocessed socket during request processing.'); |
|
143 | } |
||
144 | |||
145 | 1 | unset($this->items[$key]); |
|
146 | 1 | } |
|
147 | |||
148 | /** {@inheritdoc} */ |
||
149 | 1 | public function postponeSocket(SocketInterface $socket) |
|
|
|||
150 | { |
||
151 | 1 | $key = $this->getOperationStorageKey($socket); |
|
152 | 1 | if (!isset($this->items[$key])) { |
|
153 | 1 | return; |
|
154 | } |
||
155 | |||
156 | $this->items[$key]->postpone(); |
||
157 | } |
||
158 | |||
159 | /** {@inheritdoc} */ |
||
160 | 2 | public function resetSpeedRateCounters(SocketInterface $socket) |
|
161 | { |
||
162 | 2 | $this->requireDescriptor($socket)->resetCounter(RequestDescriptor::COUNTER_RECV_MIN_RATE); |
|
163 | 1 | } |
|
164 | |||
165 | /** {@inheritdoc} */ |
||
166 | 61 | public function getSocketMetaData(SocketInterface $socket) |
|
167 | { |
||
168 | 61 | return $this->requireDescriptor($socket)->getMetadata(); |
|
169 | } |
||
170 | |||
171 | /** {@inheritdoc} */ |
||
172 | 17 | public function setSocketMetaData(SocketInterface $socket, $key, $value = null) |
|
173 | { |
||
174 | $writableKeys = [ |
||
175 | 17 | RequestExecutorInterface::META_ADDRESS => 1, |
|
176 | RequestExecutorInterface::META_USER_CONTEXT => 1, |
||
177 | RequestExecutorInterface::META_CONNECTION_TIMEOUT => 1, |
||
178 | RequestExecutorInterface::META_IO_TIMEOUT => 1, |
||
179 | RequestExecutorInterface::META_SOCKET_STREAM_CONTEXT => 1, |
||
180 | RequestExecutorInterface::META_MIN_RECEIVE_SPEED => 1, |
||
181 | RequestExecutorInterface::META_MIN_RECEIVE_SPEED_DURATION => 1, |
||
182 | RequestExecutorInterface::META_MIN_SEND_SPEED => 1, |
||
183 | RequestExecutorInterface::META_MIN_SEND_SPEED_DURATION => 1, |
||
184 | ]; |
||
185 | |||
186 | 17 | if (!is_array($key)) { |
|
187 | 17 | $key = [ $key => $value ]; |
|
188 | } |
||
189 | |||
190 | 17 | $key = array_intersect_key($key, $writableKeys); |
|
191 | 17 | $this->requireDescriptor($socket)->setMetadata($key); |
|
192 | 17 | } |
|
193 | |||
194 | /** |
||
195 | * Return socket key in internal storage |
||
196 | * |
||
197 | * @param SocketInterface $socket Socket object |
||
198 | * |
||
199 | * @return string |
||
200 | */ |
||
201 | 95 | private function getOperationStorageKey(SocketInterface $socket) |
|
205 | |||
206 | /** |
||
207 | * Require operation descriptor for given socket |
||
208 | * |
||
209 | * @param SocketInterface $socket Socket object |
||
210 | * |
||
211 | * @return RequestDescriptor |
||
212 | * @throws \OutOfBoundsException |
||
213 | */ |
||
214 | 64 | private function requireDescriptor(SocketInterface $socket) |
|
223 | |||
224 | /** |
||
225 | * Return metadata items |
||
226 | * |
||
227 | * @return RequestDescriptor[] |
||
228 | */ |
||
229 | 68 | public function getItems() |
|
233 | } |
||
234 |
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.