Completed
Branch 0.4-dev (97f287)
by Evgenij
02:20
created

AbstractOobHandler::getTransferCounterMeta()   B

Complexity

Conditions 2
Paths 2

Size

Total Lines 27
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 20
CRAP Score 2.0004

Importance

Changes 0
Metric Value
dl 0
loc 27
ccs 20
cts 21
cp 0.9524
rs 8.8571
c 0
b 0
f 0
cc 2
eloc 19
nc 2
nop 2
crap 2.0004
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor\Pipeline;
11
12
use AsyncSockets\Event\ReadEvent;
13
use AsyncSockets\Frame\RawFramePicker;
14
use AsyncSockets\Operation\OperationInterface;
15
use AsyncSockets\RequestExecutor\EventHandlerInterface;
16
use AsyncSockets\RequestExecutor\IoHandlerInterface;
17
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
18
use AsyncSockets\RequestExecutor\Metadata\SpeedRateCounter;
19
use AsyncSockets\RequestExecutor\RequestExecutorInterface;
20
21
/**
22
 * Class AbstractOobHandler
23
 */
24
abstract class AbstractOobHandler implements IoHandlerInterface
25
{
26
    /**
27
     * Operation to descriptor state map
28
     *
29
     * @var array
30
     */
31
    private static $stateMap = [
32
        OperationInterface::OPERATION_READ  => RequestDescriptor::RDS_READ,
33
        OperationInterface::OPERATION_WRITE => RequestDescriptor::RDS_WRITE,
34
    ];
35
36
    /**
37
     * {@inheritdoc}
38
     */
39 111
    final public function handle(
40
        RequestDescriptor $descriptor,
41
        RequestExecutorInterface $executor,
42
        EventHandlerInterface $eventHandler
43
    ) {
44 111
        $result = $this->handleOobData($descriptor, $executor, $eventHandler);
45 111
        if ($result) {
46 5
            return $result;
47
        }
48
49 106
        $state = $this->getInvokeState($descriptor->getOperation());
50 106
        if ($descriptor->hasState($state)) {
51 101
            $descriptor->clearState($state);
52
53 101
            $result = $this->handleOperation($descriptor, $executor, $eventHandler);
54 59
        }
55
56 64
        return $result;
57
    }
58
59
    /**
60
     * Process given operation
61
     *
62
     * @param RequestDescriptor        $descriptor Request descriptor
63
     * @param RequestExecutorInterface $executor Executor, processing operation
64
     * @param EventHandlerInterface    $eventHandler Event handler for this operation
65
     *
66
     * @return OperationInterface|null Next operation to pass in socket. Return null,
67
     *      if next operation is not required. Return $operation parameter, if operation is not completed yet
68
     */
69
    abstract protected function handleOperation(
70
        RequestDescriptor $descriptor,
71
        RequestExecutorInterface $executor,
72
        EventHandlerInterface $eventHandler
73
    );
74
75
    /**
76
     * Return state which should invoke this operation
77
     *
78
     * @param OperationInterface $operation Operation object
79
     *
80
     * @return int RequestDescriptor::RDS_* constant
81
     */
82 106
    private function getInvokeState(OperationInterface $operation)
83
    {
84 106
        $type = $operation->getType();
85
86 106
        return isset(self::$stateMap[$type]) ? self::$stateMap[$type] : 0;
87
    }
88
89
    /**
90
     * Handle OOB data
91
     *
92
     * @param RequestDescriptor        $descriptor Request descriptor
93
     * @param RequestExecutorInterface $executor Executor, processing operation
94
     * @param EventHandlerInterface    $eventHandler Event handler for this operation
95
     *
96
     * @return OperationInterface|null Operation to return to user or null to continue normal processing
97
     */
98 111
    private function handleOobData(
99
        RequestDescriptor $descriptor,
100
        RequestExecutorInterface $executor,
101
        EventHandlerInterface $eventHandler
102
    ) {
103 111
        if (!$descriptor->hasState(RequestDescriptor::RDS_OOB)) {
104 101
            return null;
105
        }
106
107 10
        $descriptor->clearState(RequestDescriptor::RDS_OOB);
108
109 10
        $picker = new RawFramePicker();
110 10
        $socket = $descriptor->getSocket();
111 10
        $meta   = $descriptor->getMetadata();
112 10
        $frame  = $socket->read($picker, true);
113 10
        $event  = new ReadEvent(
114 10
            $executor,
115 10
            $socket,
116 10
            $meta[ RequestExecutorInterface::META_USER_CONTEXT ],
117 10
            $frame,
118
            true
119 10
        );
120
121 10
        $eventHandler->invokeEvent($event);
122
123 10
        return $event->getNextOperation();
124
    }
125
126
    /**
127
     * Creates transfer rate counter with given parameters
128
     *
129
     * @param string            $name Counter name to create
130
     * @param int               $minRate Minimum speed setting for counter
131
     * @param int               $duration Max duration of low speed
132
     * @param RequestDescriptor $descriptor Request descriptor for counter
133
     *
134
     * @return SpeedRateCounter
135
     */
136 79
    private function createRateCounter($name, $minRate, $duration, RequestDescriptor $descriptor)
137
    {
138 79
        $counter = $descriptor->getCounter($name);
139
140 79
        if (!$counter) {
141 79
            $meta    = $descriptor->getMetadata();
142 79
            $counter = new SpeedRateCounter($minRate, $duration);
143 79
            $time    = $meta[RequestExecutorInterface::META_LAST_IO_START_TIME] ?:
144 79
                        $meta[RequestExecutorInterface::META_CONNECTION_FINISH_TIME];
145 79
            $counter->advance($time, 0);
146 79
            $descriptor->registerCounter($name, $counter);
147 79
        }
148
149 79
        return $counter;
150
    }
151
152
    /**
153
     * Process transfer bytes counter
154
     *
155
     * @param string            $name Counter name operating this transfer
156
     * @param RequestDescriptor $descriptor Socket descriptor
157
     * @param int               $bytes Amount of bytes processed by transfer
158
     *
159
     * @return void
160
     */
161 79
    protected function handleTransferCounter($name, RequestDescriptor $descriptor, $bytes)
162
    {
163 79
        $meta = $descriptor->getMetadata();
164
165 79
        $info = $this->getTransferCounterMeta($name, $meta);
166
167 79
        $descriptor->resetCounter($info['resetCounter']);
168
169 79
        $meta = $descriptor->getMetadata();
170 79
        $descriptor->setMetadata($info['bytesCounter'], $meta[$info['bytesCounter']] + $bytes);
171
172 79
        $counter = $descriptor->getCounter($name);
173
        try {
174 79
            if (!$counter) {
175 79
                $counter = $this->createRateCounter($name, $info['minSpeed'], $info['duration'], $descriptor);
176 79
            }
177
178 79
            $counter->advance(microtime(true), $bytes);
179 78
            $descriptor->setMetadata(
180 78
                $info['speedCounter'],
181 78
                $counter->getCurrentSpeed() ?: $meta[$info['speedCounter']]
182 78
            );
183 79
        } catch (\OverflowException $e) {
184 1
            $callable = $info['exception'];
185
186 1
            throw $callable(
187 1
                $descriptor->getSocket(),
188 1
                $counter->getCurrentSpeed(),
189 1
                $counter->getCurrentDuration()
190 1
            );
191
        }
192 78
    }
193
194
    /**
195
     * Return metadata for transfer rate counter
196
     *
197
     * @param string $name Counter name
198
     * @param array  $meta Socket metadata
199
     *
200
     * @return array
201
     */
202 79
    private function getTransferCounterMeta($name, array $meta)
203
    {
204
        $map = [
205 79
            RequestDescriptor::COUNTER_RECV_MIN_RATE => [
206 79
                'resetCounter' => RequestDescriptor::COUNTER_SEND_MIN_RATE,
207 79
                'minSpeed'     => $meta[ RequestExecutorInterface::META_MIN_RECEIVE_SPEED ],
208 79
                'duration'     => $meta[ RequestExecutorInterface::META_MIN_RECEIVE_SPEED_DURATION ],
209 79
                'bytesCounter' => RequestExecutorInterface::META_BYTES_RECEIVED,
210 79
                'speedCounter' => RequestExecutorInterface::META_RECEIVE_SPEED,
211 79
                'exception'    => [ 'AsyncSockets\Exception\SlowSpeedTransferException', 'tooSlowDataReceiving' ],
212 79
            ],
213 79
            RequestDescriptor::COUNTER_SEND_MIN_RATE => [
214 79
                'resetCounter' => RequestDescriptor::COUNTER_RECV_MIN_RATE,
215 79
                'minSpeed'     => $meta[ RequestExecutorInterface::META_MIN_SEND_SPEED ],
216 79
                'duration'     => $meta[ RequestExecutorInterface::META_MIN_SEND_SPEED_DURATION ],
217 79
                'bytesCounter' => RequestExecutorInterface::META_BYTES_SENT,
218 79
                'speedCounter' => RequestExecutorInterface::META_SEND_SPEED,
219 79
                'exception'    => [ 'AsyncSockets\Exception\SlowSpeedTransferException', 'tooSlowDataSending' ],
220 79
            ],
221 79
        ];
222
223 79
        if (!isset($map[ $name ])) {
224
            throw new \LogicException('Can not process counter ' . $name . ' in transfer operation');
225
        }
226
227 79
        return $map[ $name ];
228
    }
229
}
230