WriteIoHandler::writeDataToSocket()   B
last analyzed

Complexity

Conditions 6
Paths 10

Size

Total Lines 36
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 24
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 36
ccs 24
cts 24
cp 1
rs 8.439
c 0
b 0
f 0
cc 6
eloc 24
nc 10
nop 4
crap 6
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor\Pipeline;
11
12
use AsyncSockets\Event\WriteEvent;
13
use AsyncSockets\Operation\InProgressWriteOperation;
14
use AsyncSockets\Operation\OperationInterface;
15
use AsyncSockets\Operation\WriteOperation;
16
use AsyncSockets\RequestExecutor\EventHandlerInterface;
17
use AsyncSockets\RequestExecutor\ExecutionContext;
18
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
19
use AsyncSockets\RequestExecutor\RequestExecutorInterface;
20
use AsyncSockets\Socket\Io\IoInterface;
21
use AsyncSockets\Socket\SocketInterface;
22
23
/**
24
 * Class WriteIoHandler
25
 */
26
class WriteIoHandler extends AbstractOobHandler
27
{
28
    /** {@inheritdoc} */
29 42
    public function supports(OperationInterface $operation)
30
    {
31 42
        return ($operation instanceof WriteOperation) || ($operation instanceof InProgressWriteOperation);
32
    }
33
34
    /** {@inheritdoc} */
35 47
    protected function handleOperation(
36
        OperationInterface $operation,
37
        RequestDescriptor $descriptor,
38
        RequestExecutorInterface $executor,
39
        EventHandlerInterface $eventHandler,
40
        ExecutionContext $executionContext
41
    ) {
42 47
        $socket = $descriptor->getSocket();
43
44
        /** @var WriteOperation $operation */
45 47
        $fireEvent = !($operation instanceof InProgressWriteOperation);
46
47 47
        if ($fireEvent) {
48 46
            $meta  = $executor->socketBag()->getSocketMetaData($socket);
49 46
            $event = new WriteEvent(
50 46
                $operation,
51 46
                $executor,
52 46
                $socket,
53 46
                $meta[ RequestExecutorInterface::META_USER_CONTEXT ]
54 46
            );
55 46
            $eventHandler->invokeEvent($event, $executor, $descriptor->getSocket(), $executionContext);
56 31
            $nextOperation = $event->getNextOperation();
57 31
        } else {
58 1
            $nextOperation = $operation;
59
        }
60
61 32
        $result = $this->writeDataToSocket($operation, $socket, $nextOperation, $bytesWritten);
62 28
        $this->handleTransferCounter(RequestDescriptor::COUNTER_SEND_MIN_RATE, $descriptor, $bytesWritten);
63
64 28
        return $result;
65
    }
66
67
    /**
68
     * Perform data writing to socket and return suitable next socket operation
69
     *
70
     * @param WriteOperation     $operation Current write operation instance
71
     * @param SocketInterface    $socket Socket object
72
     * @param OperationInterface $nextOperation Desirable next operation
73
     * @param int                $bytesWritten Amount of written bytes
74
     *
75
     * @return OperationInterface Actual next operation
76
     */
77 32
    private function writeDataToSocket(
78
        WriteOperation $operation,
79
        SocketInterface $socket,
80
        OperationInterface $nextOperation = null,
81
        &$bytesWritten = null
82
    ) {
83 32
        $result               = $nextOperation;
84 32
        $extractNextOperation = true;
85 32
        $bytesWritten         = 0;
86
87 32
        $iterator = $this->resolveDataIterator($operation);
88 31
        if ($iterator->valid()) {
89 9
            $data     = $iterator->current();
90 9
            $length   = strlen($data);
91 9
            $written  = $socket->write($data, $operation->isOutOfBand());
92 6
            if ($length !== $written) {
93 2
                $iterator->unread($length - $written);
94 2
            }
95
96 6
            $bytesWritten = $written;
97 6
            $iterator->next();
98
99 6
            if ($iterator->valid()) {
100 2
                $extractNextOperation = false;
101 2
                $operation = $this->makeInProgressWriteOperation($operation, $nextOperation);
102 2
                $result    = $operation;
103 2
            }
104 6
        }
105
106 28
        if ($extractNextOperation && ($operation instanceof InProgressWriteOperation)) {
107
            /** @var InProgressWriteOperation $operation */
108 1
            $result = $operation->getNextOperation();
109 1
        }
110
111 28
        return $result;
112
    }
113
114
    /**
115
     * Marks write operation as in progress
116
     *
117
     * @param WriteOperation     $operation Current operation object
118
     * @param OperationInterface $nextOperation Next planned operation
119
     *
120
     * @return InProgressWriteOperation Next operation object
121
     */
122 2
    private function makeInProgressWriteOperation(WriteOperation $operation, OperationInterface $nextOperation = null)
123
    {
124 2
        $result = $operation;
125 2
        if (!($result instanceof InProgressWriteOperation)) {
126 2
            $result = new InProgressWriteOperation($nextOperation, $operation->getData());
127 2
        }
128
129 2
        return $result;
130
    }
131
132
    /**
133
     * Return iterator for writing operation
134
     *
135
     * @param WriteOperation $operation The operation
136
     *
137
     * @return PushbackIterator
138
     */
139 32
    private function resolveDataIterator(WriteOperation $operation)
140
    {
141 32
        $result = $operation->getData();
142 32
        if (!($result instanceof PushbackIterator)) {
143 28
            $result = new PushbackIterator(
144 28
                $this->dataToIterator($result),
145
                IoInterface::SOCKET_BUFFER_SIZE
146 27
            );
147
148 27
            $result->rewind();
149
150 27
            $operation->setData($result);
151 27
        }
152
153 31
        return $result;
154
    }
155
156
    /**
157
     * Converts data to Traversable object
158
     *
159
     * @param mixed $data Data to convert into object
160
     *
161
     * @return \Iterator
162
     * @throws \LogicException If data can not be converted to \Traversable
163
     */
164 28
    private function dataToIterator($data)
165
    {
166 28
        switch (true) {
167 28
            case !is_object($data):
168 27
                return new \ArrayIterator((array) $data);
169 1
            case $data instanceof \Iterator:
170
                return $data;
171 1
            case $data instanceof \Traversable:
172
                return new \IteratorIterator($data);
173 1
            default:
174 1
                throw new \LogicException(
175 1
                    sprintf(
176 1
                        'Trying to send unexpected data type %s',
177 1
                        is_object($data) ? get_class($data) : gettype($data)
178 1
                    )
179 1
                );
180 1
        }
181
    }
182
183
    /**
184
     * @inheritDoc
185
     */
186 48
    protected function getHandlerType()
187
    {
188 48
        return RequestDescriptor::RDS_WRITE;
189
    }
190
}
191