Completed
Branch 0.4-dev (dcd04a)
by Evgenij
02:34
created

AbstractOobHandler   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 193
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Test Coverage

Coverage 98.28%

Importance

Changes 0
Metric Value
wmc 15
lcom 1
cbo 6
dl 0
loc 193
ccs 57
cts 58
cp 0.9828
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A handle() 0 19 3
handleOperation() 0 5 ?
A getInvokeState() 0 6 2
B handleOobData() 0 27 2
A createRateCounter() 0 15 3
B handleTransferCounter() 0 55 5
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor\Pipeline;
11
12
use AsyncSockets\Event\ReadEvent;
13
use AsyncSockets\Frame\RawFramePicker;
14
use AsyncSockets\Operation\OperationInterface;
15
use AsyncSockets\RequestExecutor\EventHandlerInterface;
16
use AsyncSockets\RequestExecutor\IoHandlerInterface;
17
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
18
use AsyncSockets\RequestExecutor\Metadata\SpeedRateCounter;
19
use AsyncSockets\RequestExecutor\RequestExecutorInterface;
20
21
/**
22
 * Class AbstractOobHandler
23
 */
24
abstract class AbstractOobHandler implements IoHandlerInterface
25
{
26
    /**
27
     * Operation to descriptor state map
28
     *
29
     * @var array
30
     */
31
    private static $stateMap = [
32
        OperationInterface::OPERATION_READ  => RequestDescriptor::RDS_READ,
33
        OperationInterface::OPERATION_WRITE => RequestDescriptor::RDS_WRITE,
34
    ];
35
36
    /**
37
     * {@inheritdoc}
38
     */
39 70
    final public function handle(
40
        RequestDescriptor $descriptor,
41
        RequestExecutorInterface $executor,
42
        EventHandlerInterface $eventHandler
43
    ) {
44 70
        $result = $this->handleOobData($descriptor, $executor, $eventHandler);
45 70
        if ($result) {
46 5
            return $result;
47
        }
48
49 65
        $state = $this->getInvokeState($descriptor->getOperation());
50 65
        if ($descriptor->hasState($state)) {
51 60
            $descriptor->clearState($state);
52
53 60
            $result = $this->handleOperation($descriptor, $executor, $eventHandler);
54
        }
55
56 41
        return $result;
57
    }
58
59
    /**
60
     * Process given operation
61
     *
62
     * @param RequestDescriptor        $descriptor Request descriptor
63
     * @param RequestExecutorInterface $executor Executor, processing operation
64
     * @param EventHandlerInterface    $eventHandler Event handler for this operation
65
     *
66
     * @return OperationInterface|null Next operation to pass in socket. Return null,
67
     *      if next operation is not required. Return $operation parameter, if operation is not completed yet
68
     */
69
    abstract protected function handleOperation(
70
        RequestDescriptor $descriptor,
71
        RequestExecutorInterface $executor,
72
        EventHandlerInterface $eventHandler
73
    );
74
75
    /**
76
     * Return state which should invoke this operation
77
     *
78
     * @param OperationInterface $operation Operation object
79
     *
80
     * @return int RequestDescriptor::RDS_* constant
81
     */
82 65
    private function getInvokeState(OperationInterface $operation)
83
    {
84 65
        $type = $operation->getType();
85
86 65
        return isset(self::$stateMap[$type]) ? self::$stateMap[$type] : 0;
87
    }
88
89
    /**
90
     * Handle OOB data
91
     *
92
     * @param RequestDescriptor        $descriptor Request descriptor
93
     * @param RequestExecutorInterface $executor Executor, processing operation
94
     * @param EventHandlerInterface    $eventHandler Event handler for this operation
95
     *
96
     * @return OperationInterface|null Operation to return to user or null to continue normal processing
97
     */
98 70
    private function handleOobData(
99
        RequestDescriptor $descriptor,
100
        RequestExecutorInterface $executor,
101
        EventHandlerInterface $eventHandler
102
    ) {
103 70
        if (!$descriptor->hasState(RequestDescriptor::RDS_OOB)) {
104 60
            return null;
105
        }
106
107 10
        $descriptor->clearState(RequestDescriptor::RDS_OOB);
108
109 10
        $picker = new RawFramePicker();
110 10
        $socket = $descriptor->getSocket();
111 10
        $meta   = $descriptor->getMetadata();
112 10
        $frame  = $socket->read($picker, true);
113 10
        $event  = new ReadEvent(
114
            $executor,
115
            $socket,
116 10
            $meta[ RequestExecutorInterface::META_USER_CONTEXT ],
117
            $frame,
118 10
            true
119
        );
120
121 10
        $eventHandler->invokeEvent($event);
122
123 10
        return $event->getNextOperation();
124
    }
125
126
    /**
127
     * Creates transfer rate counter with given parameters
128
     *
129
     * @param string            $name Counter name to create
130
     * @param int               $minRate Minimum speed setting for counter
131
     * @param int               $duration Max duration of low speed
132
     * @param RequestDescriptor $descriptor Request descriptor for counter
133
     *
134
     * @return SpeedRateCounter
135
     */
136 46
    private function createRateCounter($name, $minRate, $duration, RequestDescriptor $descriptor)
137
    {
138 46
        $counter = $descriptor->getCounter($name);
139
140 46
        if (!$counter) {
141 46
            $meta    = $descriptor->getMetadata();
142 46
            $counter = new SpeedRateCounter($minRate, $duration);
143 46
            $time    = $meta[RequestExecutorInterface::META_LAST_IO_START_TIME] ?:
144 46
                        $meta[RequestExecutorInterface::META_CONNECTION_FINISH_TIME];
145 46
            $counter->advance($time, 0);
146 46
            $descriptor->registerCounter($name, $counter);
147
        }
148
149 46
        return $counter;
150
    }
151
152
    /**
153
     * Process transfer bytes counter
154
     *
155
     * @param string            $name Counter name operating this transfer
156
     * @param RequestDescriptor $descriptor Socket descriptor
157
     * @param int               $bytes Amount of bytes processed by transfer
158
     *
159
     * @return void
160
     */
161 46
    protected function handleTransferCounter($name, RequestDescriptor $descriptor, $bytes)
162
    {
163 46
        $meta = $descriptor->getMetadata();
164
165
        $map = [
166
            RequestDescriptor::COUNTER_RECV_MIN_RATE => [
167
                'resetCounter' => RequestDescriptor::COUNTER_SEND_MIN_RATE,
168
                'minSpeed'     => $meta[ RequestExecutorInterface::META_MIN_RECEIVE_SPEED ],
169
                'duration'     => $meta[ RequestExecutorInterface::META_MIN_RECEIVE_SPEED_DURATION ],
170
                'bytesCounter' => RequestExecutorInterface::META_BYTES_RECEIVED,
171
                'speedCounter' => RequestExecutorInterface::META_RECEIVE_SPEED,
172
                'exception'    => ['AsyncSockets\Exception\SlowSpeedTransferException', 'tooSlowDataReceiving'],
173 46
            ],
174
            RequestDescriptor::COUNTER_SEND_MIN_RATE => [
175
                'resetCounter' => RequestDescriptor::COUNTER_RECV_MIN_RATE,
176
                'minSpeed'     => $meta[ RequestExecutorInterface::META_MIN_SEND_SPEED ],
177
                'duration'     => $meta[ RequestExecutorInterface::META_MIN_SEND_SPEED_DURATION ],
178
                'bytesCounter' => RequestExecutorInterface::META_BYTES_SENT,
179
                'speedCounter' => RequestExecutorInterface::META_SEND_SPEED,
180
                'exception'    => ['AsyncSockets\Exception\SlowSpeedTransferException', 'tooSlowDataSending'],
181
            ],
182
        ];
183
184 46
        if (!isset($map[$name])) {
185
            throw new \LogicException('Can not process counter ' . $name . ' in transfer operation');
186
        }
187
188 46
        $info = $map[$name];
189
190 46
        $descriptor->resetCounter($info['resetCounter']);
191
192 46
        $meta = $descriptor->getMetadata();
193 46
        $descriptor->setMetadata($info['bytesCounter'], $meta[$info['bytesCounter']] + $bytes);
194
195 46
        $counter = $descriptor->getCounter($name);
196
        try {
197 46
            if (!$counter) {
198 46
                $counter = $this->createRateCounter($name, $info['minSpeed'], $info['duration'], $descriptor);
199
            }
200
201 46
            $counter->advance(microtime(true), $bytes);
202 45
            $descriptor->setMetadata(
203 45
                $info['speedCounter'],
204 45
                $counter->getCurrentSpeed() ?: $meta[$info['speedCounter']]
205
            );
206 1
        } catch (\OverflowException $e) {
207 1
            $callable = $info['exception'];
208
209 1
            throw $callable(
210 1
                $descriptor->getSocket(),
211 1
                $counter->getCurrentSpeed(),
212 1
                $counter->getCurrentDuration()
213
            );
214
        }
215 45
    }
216
}
217