IoStage::processStage()   A
last analyzed

Complexity

Conditions 4
Paths 4

Size

Total Lines 21
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 4

Importance

Changes 0
Metric Value
dl 0
loc 21
ccs 14
cts 14
cp 1
rs 9.0534
c 0
b 0
f 0
cc 4
eloc 12
nc 4
nop 1
crap 4
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor\Pipeline;
11
12
use AsyncSockets\Event\EventType;
13
use AsyncSockets\Exception\NetworkSocketException;
14
use AsyncSockets\Exception\SocketException;
15
use AsyncSockets\Operation\NullOperation;
16
use AsyncSockets\Operation\OperationInterface;
17
use AsyncSockets\RequestExecutor\ExecutionContext;
18
use AsyncSockets\RequestExecutor\IoHandlerInterface;
19
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
20
use AsyncSockets\RequestExecutor\RequestExecutorInterface;
21
22
/**
23
 * Class IoStage
24
 */
25
class IoStage extends AbstractTimeAwareStage
26
{
27
    /**
28
     * Handlers for processing I/O
29
     *
30
     * @var IoHandlerInterface
31
     */
32
    private $ioHandler;
33
34
    /**
35
     * IoStage constructor.
36
     *
37
     * @param RequestExecutorInterface $executor         Request executor
38
     * @param EventCaller              $eventCaller      Event caller
39
     * @param ExecutionContext         $executionContext Execution context
40
     * @param IoHandlerInterface       $ioHandler        Operation handler
41
     */
42 148
    public function __construct(
43
        RequestExecutorInterface $executor,
44
        EventCaller $eventCaller,
45
        ExecutionContext $executionContext,
46
        IoHandlerInterface $ioHandler
47
    ) {
48 148
        parent::__construct($executor, $eventCaller, $executionContext);
49 148
        $this->ioHandler = $ioHandler;
50 148
    }
51
52
    /** {@inheritdoc} */
53 114
    public function processStage(array $requestDescriptors)
54
    {
55
        /** @var RequestDescriptor[] $requestDescriptors */
56 114
        $result = [];
57 114
        foreach ($requestDescriptors as $descriptor) {
58 108
            if (!$this->setConnectionFinishTime($descriptor)) {
59 9
                $result[] = $descriptor;
60 9
                continue;
61
            }
62
63 95
            $handler       = $this->requireIoHandler($descriptor);
64 93
            $nextOperation = $this->handleIoOperation($descriptor, $handler);
65 61
            $isComplete    = $this->resolveNextOperation($descriptor, $nextOperation);
66
67 61
            if ($isComplete) {
68 51
                $result[] = $descriptor;
69 51
            }
70 72
        }
71
72 72
        return $result;
73
    }
74
75
    /**
76
     * Resolves I/O operation type and process it
77
     *
78
     * @param RequestDescriptor $requestDescriptor Operation object
79
     *
80
     * @return IoHandlerInterface Flag, whether operation is complete
81
     * @throws \LogicException
82
     */
83 95
    private function requireIoHandler(RequestDescriptor $requestDescriptor)
84
    {
85 95
        $operation = $requestDescriptor->getOperation();
86 95
        if (!$this->ioHandler->supports($operation)) {
87 2
            throw new \LogicException('There is no handler able to process ' . get_class($operation) . ' operation.');
88
        }
89
90 93
        return $this->ioHandler;
91
    }
92
93
    /**
94
     * Process I/O
95
     *
96
     * @param RequestDescriptor  $requestDescriptor
97
     * @param IoHandlerInterface $ioHandler
98
     *
99
     * @return OperationInterface
100
     */
101 93
    private function handleIoOperation(RequestDescriptor $requestDescriptor, IoHandlerInterface $ioHandler)
102
    {
103
        try {
104 93
            $this->eventCaller->setCurrentOperation($requestDescriptor);
105 93
            $result = $ioHandler->handle(
106 93
                $requestDescriptor->getOperation(),
107 93
                $requestDescriptor,
108 93
                $this->executor,
109 93
                $this->eventCaller,
110 93
                $this->executionContext
111 93
            );
112 55
            $this->eventCaller->clearCurrentOperation();
113
114 55
            return $result ?: NullOperation::getInstance();
115 38
        } catch (NetworkSocketException $e) {
116 6
            $this->callExceptionSubscribers($requestDescriptor, $e);
117 6
            return NullOperation::getInstance();
118
        }
119
    }
120
121
    /**
122
     * Fill next operation in given object and return flag indicating whether operation is required
123
     *
124
     * @param RequestDescriptor  $requestDescriptor Request descriptor object
125
     * @param OperationInterface $nextOperation Next operation object
126
     *
127
     * @return bool True if given operation is complete
128
     */
129 61
    private function resolveNextOperation(
130
        RequestDescriptor $requestDescriptor,
131
        OperationInterface $nextOperation
132
    ) {
133 61
        if ($nextOperation instanceof NullOperation) {
134 51
            $requestDescriptor->setOperation($nextOperation);
135 51
            return true;
136
        }
137
138 10
        if ($requestDescriptor->getOperation() === $nextOperation) {
139 1
            return false;
140
        }
141
142 9
        $requestDescriptor->setOperation($nextOperation);
143 9
        $requestDescriptor->setMetadata(
144
            [
145 9
                RequestExecutorInterface::META_LAST_IO_START_TIME => null,
146
            ]
147 9
        );
148
149 9
        return false;
150
    }
151
152
    /**
153
     * Set connection finish time and fire socket if it was not connected
154
     *
155
     * @param RequestDescriptor $requestDescriptor
156
     *
157
     * @return bool True, if there was no error, false if operation should be stopped
158
     */
159 108
    private function setConnectionFinishTime(RequestDescriptor $requestDescriptor)
160
    {
161 108
        $meta         = $requestDescriptor->getMetadata();
162 108
        $wasConnected = $meta[ RequestExecutorInterface::META_CONNECTION_FINISH_TIME ] !== null;
163 108
        $this->setSocketOperationTime($requestDescriptor, RequestExecutorInterface::META_CONNECTION_FINISH_TIME);
164 108
        if (!$wasConnected) {
165 99
            $event = $this->createEvent($requestDescriptor, EventType::CONNECTED);
166
167
            try {
168 99
                $this->callSocketSubscribers($requestDescriptor, $event);
169 99
            } catch (SocketException $e) {
170 9
                $this->callExceptionSubscribers($requestDescriptor, $e);
171 9
                return false;
172
            }
173 86
        }
174
175 95
        return true;
176
    }
177
}
178