Completed
Branch 0.4-dev (79cc15)
by Evgenij
03:32
created

ReadIoHandler::handleRecvSpeed()   B

Complexity

Conditions 4
Paths 9

Size

Total Lines 28
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 4

Importance

Changes 0
Metric Value
dl 0
loc 28
ccs 19
cts 19
cp 1
rs 8.5806
c 0
b 0
f 0
cc 4
eloc 19
nc 9
nop 2
crap 4
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor\Pipeline;
11
12
use AsyncSockets\Event\AcceptEvent;
13
use AsyncSockets\Event\ReadEvent;
14
use AsyncSockets\Exception\AcceptException;
15
use AsyncSockets\Exception\TooSlowRecvException;
16
use AsyncSockets\Frame\AcceptedFrame;
17
use AsyncSockets\Frame\FramePickerInterface;
18
use AsyncSockets\Frame\PartialFrame;
19
use AsyncSockets\Operation\OperationInterface;
20
use AsyncSockets\Operation\ReadOperation;
21
use AsyncSockets\RequestExecutor\EventHandlerInterface;
22
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
23
use AsyncSockets\RequestExecutor\Metadata\SpeedRateCounter;
24
use AsyncSockets\RequestExecutor\RequestExecutorInterface;
25
26
/**
27
 * Class ReadIoHandler
28
 */
29
class ReadIoHandler extends AbstractOobHandler implements FramePickerInterface
30
{
31
    /**
32
     * Amount of bytes read by last operation
33
     *
34
     * @var int
35
     */
36
    private $bytesRead;
37
38
    /**
39
     * Actual frame picker
40
     *
41
     * @var FramePickerInterface
42
     */
43
    private $realFramePicker;
44
45
    /** {@inheritdoc} */
46 42
    public function supports(OperationInterface $operation)
47
    {
48 42
        return $operation instanceof ReadOperation;
49
    }
50
51
    /** {@inheritdoc} */
52 29
    protected function handleOperation(
53
        RequestDescriptor $descriptor,
54
        RequestExecutorInterface $executor,
55
        EventHandlerInterface $eventHandler
56
    ) {
57
        /** @var ReadOperation $operation */
58 29
        $operation = $descriptor->getOperation();
59 29
        $socket    = $descriptor->getSocket();
60
61 29
        $meta    = $executor->socketBag()->getSocketMetaData($socket);
62 29
        $context = $meta[RequestExecutorInterface::META_USER_CONTEXT];
63 29
        $result  = null;
64
65 29
        $this->bytesRead       = 0;
66 29
        $this->realFramePicker = $operation->getFramePicker();
67
68
        try {
69 29
            $response = $socket->read($this);
70
            switch (true) {
71 26
                case $response instanceof PartialFrame:
72 2
                    $result = $operation;
73 2
                    break;
74 21
                case $response instanceof AcceptedFrame:
75 3
                    $event = new AcceptEvent(
76
                        $executor,
77
                        $socket,
78
                        $context,
79 3
                        $response->getClientSocket(),
80 3
                        $response->getRemoteAddress()
81
                    );
82
83 3
                    $eventHandler->invokeEvent($event);
84 1
                    $result = new ReadOperation();
85 1
                    break;
86
                default:
87 21
                    $event = new ReadEvent(
88
                        $executor,
89
                        $socket,
90
                        $context,
91
                        $response,
92 21
                        false
93
                    );
94
95 21
                    $eventHandler->invokeEvent($event);
96 13
                    $result = $event->getNextOperation();
97 16
                    break;
98
            }
99 13
        } catch (AcceptException $e) {
100 1
            $result = new ReadOperation();
101 12
        } catch (\Exception $e) {
102 12
            $this->appendReadBytes($descriptor, $this->bytesRead);
103 12
            unset($this->realFramePicker, $this->bytesRead);
104 12
            throw $e;
105
        }
106
107 17
        $this->appendReadBytes($descriptor, $this->bytesRead);
108 16
        unset($this->realFramePicker, $this->bytesRead);
109
110 16
        return $result;
111
    }
112
113
    /**
114
     * Append given mount of read bytes to descriptor
115
     *
116
     * @param RequestDescriptor $descriptor The descriptor
117
     * @param int               $bytesRead Amount of read bytes
118
     *
119
     * @return void
120
     */
121 29
    private function appendReadBytes(RequestDescriptor $descriptor, $bytesRead)
122
    {
123 29
        $meta = $descriptor->getMetadata();
124 29
        $descriptor->setMetadata(
125 29
            RequestExecutorInterface::META_BYTES_RECEIVED,
126 29
            $meta[RequestExecutorInterface::META_BYTES_RECEIVED] + $bytesRead
127
        );
128
129 29
        $this->handleRecvSpeed($descriptor, $bytesRead);
130 28
    }
131
132
    /**
133
     * Process download rate for descriptor
134
     *
135
     * @param RequestDescriptor $descriptor The descriptor
136
     * @param int               $bytesRead Amount of read bytes
137
     *
138
     * @return void
139
     */
140 29
    private function handleRecvSpeed(RequestDescriptor $descriptor, $bytesRead)
141
    {
142 29
        $meta    = $descriptor->getMetadata();
143 29
        $counter = $descriptor->getCounter(RequestDescriptor::COUNTER_TRANSFER_MIN_RATE);
144
145
        try {
146 29
            if (!$counter) {
147 29
                $counter = new SpeedRateCounter(
148 29
                    $meta[RequestExecutorInterface::META_MIN_RECEIVE_SPEED],
149 29
                    $meta[RequestExecutorInterface::META_MIN_RECEIVE_SPEED_DURATION]
150
                );
151 29
                $counter->advance($meta[RequestExecutorInterface::META_CONNECTION_FINISH_TIME], 0);
152 29
                $descriptor->registerCounter(RequestDescriptor::COUNTER_TRANSFER_MIN_RATE, $counter);
153
            }
154
155 29
            $counter->advance(microtime(true), $bytesRead);
156 28
            $descriptor->setMetadata(
157 28
                RequestExecutorInterface::META_RECEIVE_SPEED,
158 28
                $counter->getCurrentSpeed() ?: $meta[RequestExecutorInterface::META_RECEIVE_SPEED]
159
            );
160 1
        } catch (\OverflowException $e) {
161 1
            throw TooSlowRecvException::tooSlowDataReceiving(
162 1
                $descriptor->getSocket(),
163 1
                $counter->getCurrentSpeed(),
164 1
                $counter->getCurrentDuration()
165
            );
166
        }
167 28
    }
168
169
    /**
170
     * {@inheritDoc}
171
     */
172
    public function isEof()
173
    {
174
        return $this->realFramePicker->isEof();
175
    }
176
177
    /**
178
     * {@inheritDoc}
179
     */
180 3
    public function pickUpData($chunk, $remoteAddress)
181
    {
182 3
        $this->bytesRead += strlen($chunk);
183 3
        return $this->realFramePicker->pickUpData($chunk, $remoteAddress);
184
    }
185
186
    /**
187
     * {@inheritDoc}
188
     */
189 1
    public function createFrame()
190
    {
191 1
        return $this->realFramePicker->createFrame();
192
    }
193
}
194