Completed
Branch 0.4-dev (50fa9b)
by Evgenij
02:41
created

WriteIoHandler::dataToTraversable()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 18
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 4

Importance

Changes 0
Metric Value
dl 0
loc 18
ccs 13
cts 13
cp 1
rs 9.2
c 0
b 0
f 0
cc 4
eloc 10
nc 4
nop 1
crap 4
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor\Pipeline;
11
12
use AsyncSockets\Event\WriteEvent;
13
use AsyncSockets\Operation\InProgressWriteOperation;
14
use AsyncSockets\Operation\OperationInterface;
15
use AsyncSockets\Operation\WriteOperation;
16
use AsyncSockets\RequestExecutor\EventHandlerInterface;
17
use AsyncSockets\RequestExecutor\ExecutionContext;
18
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
19
use AsyncSockets\RequestExecutor\RequestExecutorInterface;
20
use AsyncSockets\Socket\Io\IoInterface;
21
use AsyncSockets\Socket\SocketInterface;
22
23
/**
24
 * Class WriteIoHandler
25
 */
26
class WriteIoHandler extends AbstractOobHandler
27
{
28
    /** {@inheritdoc} */
29 42
    public function supports(OperationInterface $operation)
30
    {
31 42
        return ($operation instanceof WriteOperation) || ($operation instanceof InProgressWriteOperation);
32
    }
33
34
    /** {@inheritdoc} */
35 47
    protected function handleOperation(
36
        RequestDescriptor $descriptor,
37
        RequestExecutorInterface $executor,
38
        EventHandlerInterface $eventHandler,
39
        ExecutionContext $executionContext
40
    ) {
41 47
        $operation = $descriptor->getOperation();
42 47
        $socket    = $descriptor->getSocket();
43
44
        /** @var WriteOperation $operation */
45 47
        $fireEvent = !($operation instanceof InProgressWriteOperation);
46
47 47
        if ($fireEvent) {
48 46
            $meta  = $executor->socketBag()->getSocketMetaData($socket);
49 46
            $event = new WriteEvent(
50 46
                $operation,
51 46
                $executor,
52 46
                $socket,
53 46
                $meta[ RequestExecutorInterface::META_USER_CONTEXT ]
54 46
            );
55 46
            $eventHandler->invokeEvent($event, $executor, $descriptor->getSocket(), $executionContext);
56 31
            $nextOperation = $event->getNextOperation();
57 31
        } else {
58 1
            $nextOperation = $operation;
59
        }
60
61 32
        $result = $this->writeDataToSocket($operation, $socket, $nextOperation, $bytesWritten);
62 28
        $this->handleTransferCounter(RequestDescriptor::COUNTER_SEND_MIN_RATE, $descriptor, $bytesWritten);
63
64 28
        return $result;
65
    }
66
67
    /**
68
     * Perform data writing to socket and return suitable next socket operation
69
     *
70
     * @param WriteOperation     $operation Current write operation instance
71
     * @param SocketInterface    $socket Socket object
72
     * @param OperationInterface $nextOperation Desirable next operation
73
     * @param int                $bytesWritten Amount of written bytes
74
     *
75
     * @return OperationInterface Actual next operation
76
     */
77 32
    private function writeDataToSocket(
78
        WriteOperation $operation,
79
        SocketInterface $socket,
80
        OperationInterface $nextOperation = null,
81
        &$bytesWritten = null
82
    ) {
83 32
        $result               = $nextOperation;
84 32
        $extractNextOperation = true;
85 32
        $bytesWritten         = 0;
86
87 32
        $iterator = $this->resolveDataIterator($operation);
88 31
        if ($iterator->valid()) {
89 9
            $data     = $iterator->current();
90 9
            $length   = strlen($data);
91 9
            $written  = $socket->write($data, $operation->isOutOfBand());
92 6
            if ($length !== $written) {
93 2
                $iterator->unread($length - $written);
94 2
            }
95
96 6
            $bytesWritten = $written;
97 6
            $iterator->next();
98
99 6
            if ($iterator->valid()) {
100 2
                $extractNextOperation = false;
101 2
                $operation = $this->makeInProgressWriteOperation($operation, $nextOperation);
102 2
                $result    = $operation;
103 2
            }
104 6
        }
105
106 28
        if ($extractNextOperation && ($operation instanceof InProgressWriteOperation)) {
107
            /** @var InProgressWriteOperation $operation */
108 1
            $result = $operation->getNextOperation();
109 1
        }
110
111 28
        return $result;
112
    }
113
114
    /**
115
     * Marks write operation as in progress
116
     *
117
     * @param WriteOperation     $operation Current operation object
118
     * @param OperationInterface $nextOperation Next planned operation
119
     *
120
     * @return InProgressWriteOperation Next operation object
121
     */
122 2
    private function makeInProgressWriteOperation(WriteOperation $operation, OperationInterface $nextOperation = null)
123
    {
124 2
        $result = $operation;
125 2
        if (!($result instanceof InProgressWriteOperation)) {
126 2
            $result = new InProgressWriteOperation($nextOperation, $operation->getData());
127 2
        }
128
129 2
        return $result;
130
    }
131
132
    /**
133
     * Return iterator for writing operation
134
     *
135
     * @param WriteOperation $operation The operation
136
     *
137
     * @return PushbackIterator
138
     */
139 32
    private function resolveDataIterator(WriteOperation $operation)
140
    {
141 32
        $result = $operation->getData();
142 32
        if (!($result instanceof PushbackIterator)) {
143 28
            $result = new PushbackIterator(
144 28
                $this->dataToTraversable($result),
0 ignored issues
show
Compatibility introduced by
$this->dataToTraversable($result) of type object<Traversable> is not a sub-type of object<Iterator>. It seems like you assume a child interface of the interface Traversable to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
145
                IoInterface::SOCKET_BUFFER_SIZE
146 27
            );
147
148 27
            $result->rewind();
149
150 27
            $operation->setData($result);
151 27
        }
152
153 31
        return $result;
154
    }
155
156
    /**
157
     * Converts data to Traversable object
158
     *
159
     * @param mixed $data Data to convert into object
160
     *
161
     * @return \Traversable
162
     * @throws \LogicException If data can not be converted to \Traversable
163
     */
164 28
    private function dataToTraversable($data)
165
    {
166 28
        $result = $data;
167 28
        if (!is_object($data)) {
168 27
            $result = new \ArrayIterator((array) $result);
169 27
        }
170
171 28
        if (!($result instanceof \Traversable)) {
172 1
            throw new \LogicException(
173 1
                sprintf(
174 1
                    'Trying to send unexpected data type %s',
175 1
                    is_object($result) ? get_class($result) : gettype($result)
176 1
                )
177 1
            );
178
        }
179
180 27
        return $result;
181
    }
182
}
183