Completed
Branch 0.4-dev (50fa9b)
by Evgenij
02:41
created

AbstractOobHandler::getTransferCounterMeta()   B

Complexity

Conditions 2
Paths 2

Size

Total Lines 27
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 20
CRAP Score 2.0004

Importance

Changes 0
Metric Value
dl 0
loc 27
ccs 20
cts 21
cp 0.9524
rs 8.8571
c 0
b 0
f 0
cc 2
eloc 19
nc 2
nop 2
crap 2.0004
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor\Pipeline;
11
12
use AsyncSockets\Event\ReadEvent;
13
use AsyncSockets\Frame\RawFramePicker;
14
use AsyncSockets\Operation\OperationInterface;
15
use AsyncSockets\RequestExecutor\EventHandlerInterface;
16
use AsyncSockets\RequestExecutor\ExecutionContext;
17
use AsyncSockets\RequestExecutor\IoHandlerInterface;
18
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
19
use AsyncSockets\RequestExecutor\Metadata\SpeedRateCounter;
20
use AsyncSockets\RequestExecutor\RequestExecutorInterface;
21
22
/**
23
 * Class AbstractOobHandler
24
 */
25
abstract class AbstractOobHandler implements IoHandlerInterface
26
{
27
    /**
28
     * Operation to descriptor state map
29
     *
30
     * @var array
31
     */
32
    private static $stateMap = [
33
        OperationInterface::OPERATION_READ  => RequestDescriptor::RDS_READ,
34
        OperationInterface::OPERATION_WRITE => RequestDescriptor::RDS_WRITE,
35
    ];
36
37
    /**
38
     * {@inheritdoc}
39
     */
40 112
    final public function handle(
41
        RequestDescriptor $descriptor,
42
        RequestExecutorInterface $executor,
43
        EventHandlerInterface $eventHandler,
44
        ExecutionContext $executionContext
45
    ) {
46 112
        $result = $this->handleOobData($descriptor, $executor, $eventHandler, $executionContext);
47 112
        if ($result) {
48 5
            return $result;
49
        }
50
51 107
        $state = $this->getInvokeState($descriptor->getOperation());
52 107
        if ($descriptor->hasState($state)) {
53 102
            $descriptor->clearState($state);
54
55 102
            $result = $this->handleOperation($descriptor, $executor, $eventHandler, $executionContext);
56 59
        }
57
58 64
        return $result;
59
    }
60
61
    /**
62
     * Process given operation
63
     *
64
     * @param RequestDescriptor        $descriptor       Request descriptor
65
     * @param RequestExecutorInterface $executor         Executor, processing operation
66
     * @param EventHandlerInterface    $eventHandler     Event handler for this operation
67
     * @param ExecutionContext         $executionContext Execution context
68
     *
69
     * @return OperationInterface|null Next operation to pass in socket. Return null,
70
     *      if next operation is not required. Return $operation parameter, if operation is not completed yet
71
     */
72
    abstract protected function handleOperation(
73
        RequestDescriptor $descriptor,
74
        RequestExecutorInterface $executor,
75
        EventHandlerInterface $eventHandler,
76
        ExecutionContext $executionContext
77
    );
78
79
    /**
80
     * Return state which should invoke this operation
81
     *
82
     * @param OperationInterface $operation Operation object
83
     *
84
     * @return int RequestDescriptor::RDS_* constant
85
     */
86 107
    private function getInvokeState(OperationInterface $operation)
87
    {
88 107
        $type = $operation->getType();
89
90 107
        return isset(self::$stateMap[$type]) ? self::$stateMap[$type] : 0;
91
    }
92
93
    /**
94
     * Handle OOB data
95
     *
96
     * @param RequestDescriptor        $descriptor       Request descriptor
97
     * @param RequestExecutorInterface $executor         Executor, processing operation
98
     * @param EventHandlerInterface    $eventHandler     Event handler for this operation
99
     * @param ExecutionContext         $executionContext Execution context
100
     *
101
     * @return OperationInterface|null Operation to return to user or null to continue normal processing
102
     */
103 112
    private function handleOobData(
104
        RequestDescriptor $descriptor,
105
        RequestExecutorInterface $executor,
106
        EventHandlerInterface $eventHandler,
107
        ExecutionContext $executionContext
108
    ) {
109 112
        if (!$descriptor->hasState(RequestDescriptor::RDS_OOB)) {
110 102
            return null;
111
        }
112
113 10
        $descriptor->clearState(RequestDescriptor::RDS_OOB);
114
115 10
        $picker = new RawFramePicker();
116 10
        $socket = $descriptor->getSocket();
117 10
        $meta   = $descriptor->getMetadata();
118 10
        $frame  = $socket->read($picker, true);
119 10
        $event  = new ReadEvent(
120 10
            $executor,
121 10
            $socket,
122 10
            $meta[ RequestExecutorInterface::META_USER_CONTEXT ],
123 10
            $frame,
124
            true
125 10
        );
126
127 10
        $eventHandler->invokeEvent($event, $executor, $socket, $executionContext);
128
129 10
        return $event->getNextOperation();
130
    }
131
132
    /**
133
     * Creates transfer rate counter with given parameters
134
     *
135
     * @param string            $name Counter name to create
136
     * @param int               $minRate Minimum speed setting for counter
137
     * @param int               $duration Max duration of low speed
138
     * @param RequestDescriptor $descriptor Request descriptor for counter
139
     *
140
     * @return SpeedRateCounter
141
     */
142 79
    private function createRateCounter($name, $minRate, $duration, RequestDescriptor $descriptor)
143
    {
144 79
        $counter = $descriptor->getCounter($name);
145
146 79
        if (!$counter) {
147 79
            $meta    = $descriptor->getMetadata();
148 79
            $counter = new SpeedRateCounter($minRate, $duration);
149 79
            $time    = $meta[RequestExecutorInterface::META_LAST_IO_START_TIME] ?:
150 79
                        $meta[RequestExecutorInterface::META_CONNECTION_FINISH_TIME];
151 79
            $counter->advance($time, 0);
152 79
            $descriptor->registerCounter($name, $counter);
153 79
        }
154
155 79
        return $counter;
156
    }
157
158
    /**
159
     * Process transfer bytes counter
160
     *
161
     * @param string            $name Counter name operating this transfer
162
     * @param RequestDescriptor $descriptor Socket descriptor
163
     * @param int               $bytes Amount of bytes processed by transfer
164
     *
165
     * @return void
166
     */
167 79
    protected function handleTransferCounter($name, RequestDescriptor $descriptor, $bytes)
168
    {
169 79
        $meta = $descriptor->getMetadata();
170
171 79
        $info = $this->getTransferCounterMeta($name, $meta);
172
173 79
        $descriptor->resetCounter($info['resetCounter']);
174
175 79
        $meta = $descriptor->getMetadata();
176 79
        $descriptor->setMetadata($info['bytesCounter'], $meta[$info['bytesCounter']] + $bytes);
177
178 79
        $counter = $descriptor->getCounter($name);
179
        try {
180 79
            if (!$counter) {
181 79
                $counter = $this->createRateCounter($name, $info['minSpeed'], $info['duration'], $descriptor);
182 79
            }
183
184 79
            $counter->advance(microtime(true), $bytes);
185 78
            $descriptor->setMetadata(
186 78
                $info['speedCounter'],
187 78
                $counter->getCurrentSpeed() ?: $meta[$info['speedCounter']]
188 78
            );
189 79
        } catch (\OverflowException $e) {
190 1
            $callable = $info['exception'];
191
192 1
            throw $callable(
193 1
                $descriptor->getSocket(),
194 1
                $counter->getCurrentSpeed(),
195 1
                $counter->getCurrentDuration()
196 1
            );
197
        }
198 78
    }
199
200
    /**
201
     * Return metadata for transfer rate counter
202
     *
203
     * @param string $name Counter name
204
     * @param array  $meta Socket metadata
205
     *
206
     * @return array
207
     */
208 79
    private function getTransferCounterMeta($name, array $meta)
209
    {
210
        $map = [
211 79
            RequestDescriptor::COUNTER_RECV_MIN_RATE => [
212 79
                'resetCounter' => RequestDescriptor::COUNTER_SEND_MIN_RATE,
213 79
                'minSpeed'     => $meta[ RequestExecutorInterface::META_MIN_RECEIVE_SPEED ],
214 79
                'duration'     => $meta[ RequestExecutorInterface::META_MIN_RECEIVE_SPEED_DURATION ],
215 79
                'bytesCounter' => RequestExecutorInterface::META_BYTES_RECEIVED,
216 79
                'speedCounter' => RequestExecutorInterface::META_RECEIVE_SPEED,
217 79
                'exception'    => [ 'AsyncSockets\Exception\SlowSpeedTransferException', 'tooSlowDataReceiving' ],
218 79
            ],
219 79
            RequestDescriptor::COUNTER_SEND_MIN_RATE => [
220 79
                'resetCounter' => RequestDescriptor::COUNTER_RECV_MIN_RATE,
221 79
                'minSpeed'     => $meta[ RequestExecutorInterface::META_MIN_SEND_SPEED ],
222 79
                'duration'     => $meta[ RequestExecutorInterface::META_MIN_SEND_SPEED_DURATION ],
223 79
                'bytesCounter' => RequestExecutorInterface::META_BYTES_SENT,
224 79
                'speedCounter' => RequestExecutorInterface::META_SEND_SPEED,
225 79
                'exception'    => [ 'AsyncSockets\Exception\SlowSpeedTransferException', 'tooSlowDataSending' ],
226 79
            ],
227 79
        ];
228
229 79
        if (!isset($map[ $name ])) {
230
            throw new \LogicException('Can not process counter ' . $name . ' in transfer operation');
231
        }
232
233 79
        return $map[ $name ];
234
    }
235
}
236