Completed
Branch 0.4-dev (436465)
by Evgenij
02:24
created

WriteIoHandler::resolveDataIterator()   C

Complexity

Conditions 7
Paths 9

Size

Total Lines 30
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 7.9936

Importance

Changes 0
Metric Value
dl 0
loc 30
ccs 16
cts 22
cp 0.7272
rs 6.7272
c 0
b 0
f 0
cc 7
eloc 17
nc 9
nop 1
crap 7.9936
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor\Pipeline;
11
12
use AsyncSockets\Event\WriteEvent;
13
use AsyncSockets\Operation\InProgressWriteOperation;
14
use AsyncSockets\Operation\OperationInterface;
15
use AsyncSockets\Operation\WriteOperation;
16
use AsyncSockets\RequestExecutor\EventHandlerInterface;
17
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
18
use AsyncSockets\RequestExecutor\RequestExecutorInterface;
19
use AsyncSockets\Socket\Io\IoInterface;
20
use AsyncSockets\Socket\SocketInterface;
21
22
/**
23
 * Class WriteIoHandler
24
 */
25
class WriteIoHandler extends AbstractOobHandler
26
{
27
    /** {@inheritdoc} */
28 42
    public function supports(OperationInterface $operation)
29
    {
30 42
        return ($operation instanceof WriteOperation) || ($operation instanceof InProgressWriteOperation);
31
    }
32
33
    /** {@inheritdoc} */
34 46
    protected function handleOperation(
35
        RequestDescriptor $descriptor,
36
        RequestExecutorInterface $executor,
37
        EventHandlerInterface $eventHandler
38
    ) {
39 46
        $operation = $descriptor->getOperation();
40 46
        $socket    = $descriptor->getSocket();
41
42
        /** @var WriteOperation $operation */
43 46
        $fireEvent = !($operation instanceof InProgressWriteOperation);
44
45 46
        if ($fireEvent) {
46 45
            $meta  = $executor->socketBag()->getSocketMetaData($socket);
47 45
            $event = new WriteEvent(
48 45
                $operation,
49 45
                $executor,
50 45
                $socket,
51 45
                $meta[ RequestExecutorInterface::META_USER_CONTEXT ]
52 45
            );
53 45
            $eventHandler->invokeEvent($event);
54 30
            $nextOperation = $event->getNextOperation();
55 30
        } else {
56 1
            $nextOperation = $operation;
57
        }
58
59 31
        $result = $this->writeDataToSocket($operation, $socket, $nextOperation, $bytesWritten);
60 28
        $this->handleTransferCounter(RequestDescriptor::COUNTER_SEND_MIN_RATE, $descriptor, $bytesWritten);
61
62 28
        return $result;
63
    }
64
65
    /**
66
     * Perform data writing to socket and return suitable next socket operation
67
     *
68
     * @param WriteOperation     $operation Current write operation instance
69
     * @param SocketInterface    $socket Socket object
70
     * @param OperationInterface $nextOperation Desirable next operation
71
     * @param int                $bytesWritten Amount of written bytes
72
     *
73
     * @return OperationInterface Actual next operation
74
     */
75 31
    private function writeDataToSocket(
76
        WriteOperation $operation,
77
        SocketInterface $socket,
78
        OperationInterface $nextOperation = null,
79
        &$bytesWritten = null
80
    ) {
81 31
        $result               = $nextOperation;
82 31
        $extractNextOperation = true;
83 31
        $bytesWritten         = 0;
84
85 31
        $iterator = $this->resolveDataIterator($operation);
86 31
        if ($iterator->valid()) {
87 9
            $data     = $iterator->current();
88 9
            $length   = strlen($data);
89 9
            $written  = $socket->write($data, $operation->isOutOfBand());
90 6
            if ($length !== $written) {
91 2
                $iterator->unread($length - $written);
92 2
            }
93
94 6
            $bytesWritten = $written;
95 6
            $iterator->next();
96
97 6
            if ($iterator->valid()) {
98 2
                $extractNextOperation = false;
99 2
                $operation = $this->makeInProgressWriteOperation($operation, $nextOperation);
100 2
                $result    = $operation;
101 2
            }
102 6
        }
103
104 28
        if ($extractNextOperation && ($operation instanceof InProgressWriteOperation)) {
105
            /** @var InProgressWriteOperation $operation */
106 1
            $result = $operation->getNextOperation();
107 1
        }
108
109 28
        return $result;
110
    }
111
112
    /**
113
     * Marks write operation as in progress
114
     *
115
     * @param WriteOperation     $operation Current operation object
116
     * @param OperationInterface $nextOperation Next planned operation
117
     *
118
     * @return InProgressWriteOperation Next operation object
119
     */
120 2
    private function makeInProgressWriteOperation(WriteOperation $operation, OperationInterface $nextOperation = null)
121
    {
122 2
        $result = $operation;
123 2
        if (!($result instanceof InProgressWriteOperation)) {
124 2
            $result = new InProgressWriteOperation($nextOperation, $operation->getData());
125 2
        }
126
127 2
        return $result;
128
    }
129
130
    /**
131
     * Return iterator for writing operation
132
     *
133
     * @param WriteOperation $operation The operation
134
     *
135
     * @return PushbackIterator
136
     */
137 31
    private function resolveDataIterator(WriteOperation $operation)
138
    {
139 31
        $result = $operation->getData();
140 31
        if (!($result instanceof PushbackIterator)) {
141 27
            $nested = $result;
142 27
            if ($nested === null || is_scalar($nested)) {
143 27
                $nested = (array) $nested;
144 27
            }
145
146 27
            if (is_array($nested)) {
147 27
                $nested = new \ArrayIterator($nested);
148 27
            }
149
150 27
            if (!($nested instanceof \Traversable)) {
151
                throw new \LogicException(
152
                    sprintf(
153
                        'Trying to send unexpected data type %s',
154
                        is_object($nested) ? get_class($nested) : gettype($nested)
155
                    )
156
                );
157
            }
158
159 27
            $result = new PushbackIterator($nested, IoInterface::SOCKET_BUFFER_SIZE);
1 ignored issue
show
Compatibility introduced by
$nested of type object<Traversable> is not a sub-type of object<Iterator>. It seems like you assume a child interface of the interface Traversable to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
160 27
            $result->rewind();
161
162 27
            $operation->setData($result);
163 27
        }
164
165 31
        return $result;
166
    }
167
}
168