AbstractOobHandler::handleTransferCounter()   B
last analyzed

Complexity

Conditions 4
Paths 7

Size

Total Lines 32
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 23
CRAP Score 4

Importance

Changes 0
Metric Value
dl 0
loc 32
ccs 23
cts 23
cp 1
rs 8.5806
c 0
b 0
f 0
cc 4
eloc 20
nc 7
nop 3
crap 4
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor\Pipeline;
11
12
use AsyncSockets\Event\ReadEvent;
13
use AsyncSockets\Frame\RawFramePicker;
14
use AsyncSockets\Operation\OperationInterface;
15
use AsyncSockets\RequestExecutor\EventHandlerInterface;
16
use AsyncSockets\RequestExecutor\ExecutionContext;
17
use AsyncSockets\RequestExecutor\IoHandlerInterface;
18
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
19
use AsyncSockets\RequestExecutor\Metadata\SpeedRateCounter;
20
use AsyncSockets\RequestExecutor\RequestExecutorInterface;
21
22
/**
23
 * Class AbstractOobHandler
24
 */
25
abstract class AbstractOobHandler implements IoHandlerInterface
26
{
27
    /**
28
     * Operation to descriptor state map
29
     *
30
     * @var array
31
     */
32
    private static $stateMap = [
33
        OperationInterface::OPERATION_READ  => RequestDescriptor::RDS_READ,
34
        OperationInterface::OPERATION_WRITE => RequestDescriptor::RDS_WRITE,
35
    ];
36
37
    /**
38
     * {@inheritdoc}
39
     */
40 112
    final public function handle(
41
        OperationInterface $operation,
42
        RequestDescriptor $descriptor,
43
        RequestExecutorInterface $executor,
44
        EventHandlerInterface $eventHandler,
45
        ExecutionContext $executionContext
46
    ) {
47 112
        $result = $this->handleOobData($descriptor, $executor, $eventHandler, $executionContext);
48 112
        if ($result) {
49 5
            return $result;
50
        }
51
52 107
        $state = $this->getInvokeState($descriptor->getOperation());
53 107
        if ($descriptor->hasState($state)) {
54 102
            $descriptor->clearState($state);
55
56 102
            $result = $this->handleOperation(
57 102
                $descriptor->getOperation(),
58 102
                $descriptor,
59 102
                $executor,
60 102
                $eventHandler,
61
                $executionContext
62 102
            );
63 59
        }
64
65 64
        return $result;
66
    }
67
68
    /**
69
     * Return type of this handler
70
     *
71
     * @return int One of RequestDescriptor::RDS_* constant
72
     */
73
    abstract protected function getHandlerType();
74
75
    /**
76
     * Process given operation
77
     *
78
     * @param OperationInterface       $operation        Operation to process
79
     * @param RequestDescriptor        $descriptor       Request descriptor
80
     * @param RequestExecutorInterface $executor         Executor, processing operation
81
     * @param EventHandlerInterface    $eventHandler     Event handler for this operation
82
     * @param ExecutionContext         $executionContext Execution context
83
     *
84
     * @return OperationInterface|null Next operation to pass in socket. Return null,
85
     *      if next operation is not required. Return $operation parameter, if operation is not completed yet
86
     */
87
    abstract protected function handleOperation(
88
        OperationInterface $operation,
89
        RequestDescriptor $descriptor,
90
        RequestExecutorInterface $executor,
91
        EventHandlerInterface $eventHandler,
92
        ExecutionContext $executionContext
93
    );
94
95
    /**
96
     * Return state which should invoke this operation
97
     *
98
     * @param OperationInterface $operation Operation object
99
     *
100
     * @return int Set of RequestDescriptor::RDS_* constant
101
     */
102 107
    private function getInvokeState(OperationInterface $operation)
103
    {
104 107
        $result = 0;
105 107
        foreach ($operation->getTypes() as $type) {
106 107
            $result |= isset(self::$stateMap[$type]) ? self::$stateMap[$type] : 0;
107 107
        }
108
109 107
        return $result & $this->getHandlerType();
110
    }
111
112
    /**
113
     * Handle OOB data
114
     *
115
     * @param RequestDescriptor        $descriptor       Request descriptor
116
     * @param RequestExecutorInterface $executor         Executor, processing operation
117
     * @param EventHandlerInterface    $eventHandler     Event handler for this operation
118
     * @param ExecutionContext         $executionContext Execution context
119
     *
120
     * @return OperationInterface|null Operation to return to user or null to continue normal processing
121
     */
122 112
    private function handleOobData(
123
        RequestDescriptor $descriptor,
124
        RequestExecutorInterface $executor,
125
        EventHandlerInterface $eventHandler,
126
        ExecutionContext $executionContext
127
    ) {
128 112
        if (!$descriptor->hasState(RequestDescriptor::RDS_OOB)) {
129 102
            return null;
130
        }
131
132 10
        $descriptor->clearState(RequestDescriptor::RDS_OOB);
133
134 10
        $picker = new RawFramePicker();
135 10
        $socket = $descriptor->getSocket();
136 10
        $meta   = $descriptor->getMetadata();
137 10
        $frame  = $socket->read($picker, true);
138 10
        $event  = new ReadEvent(
139 10
            $executor,
140 10
            $socket,
141 10
            $meta[ RequestExecutorInterface::META_USER_CONTEXT ],
142 10
            $frame,
143
            true
144 10
        );
145
146 10
        $eventHandler->invokeEvent($event, $executor, $socket, $executionContext);
147
148 10
        return $event->getNextOperation();
149
    }
150
151
    /**
152
     * Creates transfer rate counter with given parameters
153
     *
154
     * @param string            $name Counter name to create
155
     * @param int               $minRate Minimum speed setting for counter
156
     * @param int               $duration Max duration of low speed
157
     * @param RequestDescriptor $descriptor Request descriptor for counter
158
     *
159
     * @return SpeedRateCounter
160
     */
161 79
    private function createRateCounter($name, $minRate, $duration, RequestDescriptor $descriptor)
162
    {
163 79
        $counter = $descriptor->getCounter($name);
164
165 79
        if (!$counter) {
166 79
            $meta    = $descriptor->getMetadata();
167 79
            $counter = new SpeedRateCounter($minRate, $duration);
168 79
            $time    = $meta[RequestExecutorInterface::META_LAST_IO_START_TIME] ?:
169 79
                        $meta[RequestExecutorInterface::META_CONNECTION_FINISH_TIME];
170 79
            $counter->advance($time, 0);
171 79
            $descriptor->registerCounter($name, $counter);
172 79
        }
173
174 79
        return $counter;
175
    }
176
177
    /**
178
     * Process transfer bytes counter
179
     *
180
     * @param string            $name Counter name operating this transfer
181
     * @param RequestDescriptor $descriptor Socket descriptor
182
     * @param int               $bytes Amount of bytes processed by transfer
183
     *
184
     * @return void
185
     */
186 79
    protected function handleTransferCounter($name, RequestDescriptor $descriptor, $bytes)
187
    {
188 79
        $meta = $descriptor->getMetadata();
189
190 79
        $info = $this->getTransferCounterMeta($name, $meta);
191
192 79
        $descriptor->resetCounter($info['resetCounter']);
193
194 79
        $meta = $descriptor->getMetadata();
195 79
        $descriptor->setMetadata($info['bytesCounter'], $meta[$info['bytesCounter']] + $bytes);
196
197 79
        $counter = $descriptor->getCounter($name);
198
        try {
199 79
            if (!$counter) {
200 79
                $counter = $this->createRateCounter($name, $info['minSpeed'], $info['duration'], $descriptor);
201 79
            }
202
203 79
            $counter->advance(microtime(true), $bytes);
204 78
            $descriptor->setMetadata(
205 78
                $info['speedCounter'],
206 78
                $counter->getCurrentSpeed() ?: $meta[$info['speedCounter']]
207 78
            );
208 79
        } catch (\OverflowException $e) {
209 1
            $callable = $info['exception'];
210
211 1
            throw $callable(
212 1
                $descriptor->getSocket(),
213 1
                $counter->getCurrentSpeed(),
214 1
                $counter->getCurrentDuration()
215 1
            );
216
        }
217 78
    }
218
219
    /**
220
     * Return metadata for transfer rate counter
221
     *
222
     * @param string $name Counter name
223
     * @param array  $meta Socket metadata
224
     *
225
     * @return array
226
     */
227 79
    private function getTransferCounterMeta($name, array $meta)
228
    {
229
        $map = [
230 79
            RequestDescriptor::COUNTER_RECV_MIN_RATE => [
231 79
                'resetCounter' => RequestDescriptor::COUNTER_SEND_MIN_RATE,
232 79
                'minSpeed'     => $meta[ RequestExecutorInterface::META_MIN_RECEIVE_SPEED ],
233 79
                'duration'     => $meta[ RequestExecutorInterface::META_MIN_RECEIVE_SPEED_DURATION ],
234 79
                'bytesCounter' => RequestExecutorInterface::META_BYTES_RECEIVED,
235 79
                'speedCounter' => RequestExecutorInterface::META_RECEIVE_SPEED,
236 79
                'exception'    => [ 'AsyncSockets\Exception\SlowSpeedTransferException', 'tooSlowDataReceiving' ],
237 79
            ],
238 79
            RequestDescriptor::COUNTER_SEND_MIN_RATE => [
239 79
                'resetCounter' => RequestDescriptor::COUNTER_RECV_MIN_RATE,
240 79
                'minSpeed'     => $meta[ RequestExecutorInterface::META_MIN_SEND_SPEED ],
241 79
                'duration'     => $meta[ RequestExecutorInterface::META_MIN_SEND_SPEED_DURATION ],
242 79
                'bytesCounter' => RequestExecutorInterface::META_BYTES_SENT,
243 79
                'speedCounter' => RequestExecutorInterface::META_SEND_SPEED,
244 79
                'exception'    => [ 'AsyncSockets\Exception\SlowSpeedTransferException', 'tooSlowDataSending' ],
245 79
            ],
246 79
        ];
247
248 79
        if (!isset($map[ $name ])) {
249
            throw new \LogicException('Can not process counter ' . $name . ' in transfer operation');
250
        }
251
252 79
        return $map[ $name ];
253
    }
254
}
255