Completed
Branch 0.4-dev (f926df)
by Evgenij
04:00
created

WriteIoHandler::resolveDataIterator()   C

Complexity

Conditions 7
Paths 9

Size

Total Lines 30
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 7.9936

Importance

Changes 0
Metric Value
dl 0
loc 30
ccs 16
cts 22
cp 0.7272
rs 6.7272
c 0
b 0
f 0
cc 7
eloc 17
nc 9
nop 1
crap 7.9936
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor\Pipeline;
11
12
use AsyncSockets\Event\WriteEvent;
13
use AsyncSockets\Operation\InProgressWriteOperation;
14
use AsyncSockets\Operation\OperationInterface;
15
use AsyncSockets\Operation\WriteOperation;
16
use AsyncSockets\RequestExecutor\EventHandlerInterface;
17
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
18
use AsyncSockets\RequestExecutor\RequestExecutorInterface;
19
use AsyncSockets\Socket\Io\IoInterface;
20
use AsyncSockets\Socket\SocketInterface;
21
22
/**
23
 * Class WriteIoHandler
24
 */
25
class WriteIoHandler extends AbstractOobHandler
26
{
27
    /** {@inheritdoc} */
28 42
    public function supports(OperationInterface $operation)
29
    {
30 42
        return ($operation instanceof WriteOperation) || ($operation instanceof InProgressWriteOperation);
31
    }
32
33
    /** {@inheritdoc} */
34 46
    protected function handleOperation(
35
        RequestDescriptor $descriptor,
36
        RequestExecutorInterface $executor,
37
        EventHandlerInterface $eventHandler
38
    ) {
39 46
        $operation = $descriptor->getOperation();
40 46
        $socket    = $descriptor->getSocket();
41
42
        /** @var WriteOperation $operation */
43 46
        $fireEvent = !($operation instanceof InProgressWriteOperation);
44
45 46
        if ($fireEvent) {
46 45
            $meta  = $executor->socketBag()->getSocketMetaData($socket);
47 45
            $event = new WriteEvent(
48 45
                $operation,
49 45
                $executor,
50 45
                $socket,
51 45
                $meta[ RequestExecutorInterface::META_USER_CONTEXT ]
52 45
            );
53 45
            $eventHandler->invokeEvent($event);
54 30
            $nextOperation = $event->getNextOperation();
55 30
        } else {
56 1
            $nextOperation = $operation;
57
        }
58
59 31
        $result = $this->writeDataToSocket($operation, $socket, $nextOperation, $bytesWritten);
60 28
        $this->handleTransferCounter(RequestDescriptor::COUNTER_SEND_MIN_RATE, $descriptor, $bytesWritten);
61
62 28
        return $result;
63
    }
64
65
    /**
66
     * Perform data writing to socket and return suitable next socket operation
67
     *
68
     * @param WriteOperation     $operation Current write operation instance
69
     * @param SocketInterface    $socket Socket object
70
     * @param OperationInterface $nextOperation Desirable next operation
71
     * @param int                $bytesWritten Amount of written bytes
72
     *
73
     * @return OperationInterface Actual next operation
74
     */
75 31
    private function writeDataToSocket(
76
        WriteOperation $operation,
77
        SocketInterface $socket,
78
        OperationInterface $nextOperation = null,
79
        &$bytesWritten = null
80
    ) {
81 31
        $result               = $nextOperation;
82 31
        $extractNextOperation = true;
83 31
        $bytesWritten         = 0;
84
85 31
        $iterator = $this->resolveDataIterator($operation);
86 31
        if ($iterator->valid()) {
87 9
            $data     = $iterator->current();
88 9
            $length   = strlen($data);
89 9
            $written  = $socket->write($data, $operation->isOutOfBand());
90 6
            if ($length !== $written) {
91 2
                $iterator->unread($length - $written);
92 2
            }
93
94 6
            $bytesWritten = $written;
95 6
            $iterator->next();
96
97 6
            if ($iterator->valid()) {
98 2
                $extractNextOperation = false;
99 2
                $operation = $this->makeInProgressWriteOperation($operation, $nextOperation);
100 2
                $result    = $operation;
101 2
            }
102 6
        }
103
104 28
        if ($extractNextOperation && ($operation instanceof InProgressWriteOperation)) {
105
            /** @var InProgressWriteOperation $operation */
106 1
            $result = $operation->getNextOperation();
107 1
        }
108
109 28
        return $result;
110
    }
111
112
    /**
113
     * Marks write operation as in progress
114
     *
115
     * @param WriteOperation     $operation Current operation object
116
     * @param OperationInterface $nextOperation Next planned operation
117
     *
118
     * @return InProgressWriteOperation Next operation object
119
     */
120 2
    private function makeInProgressWriteOperation(WriteOperation $operation, OperationInterface $nextOperation = null)
121
    {
122 2
        $result = $operation;
123 2
        if (!($result instanceof InProgressWriteOperation)) {
124 2
            $result = new InProgressWriteOperation($nextOperation, $operation->getData());
125 2
        }
126
127 2
        return $result;
128
    }
129
130
    /**
131
     * Return iterator for writing operation
132
     *
133
     * @param WriteOperation $operation The operation
134
     *
135
     * @return PushbackIterator
136
     */
137 31
    private function resolveDataIterator(WriteOperation $operation)
138
    {
139 31
        $result = $operation->getData();
140 31
        if (!($result instanceof PushbackIterator)) {
141 27
            $nested = $result;
142 27
            if ($nested === null || is_scalar($nested)) {
143 27
                $nested = (array) $nested;
144 27
            }
145
146 27
            if (is_array($nested)) {
147 27
                $nested = new \ArrayIterator($nested);
148 27
            }
149
150 27
            if (!($nested instanceof \Traversable)) {
151
                throw new \LogicException(
152
                    sprintf(
153
                        'Trying to send unexpected data type %s',
154
                        is_object($nested) ? get_class($nested) : gettype($nested)
155
                    )
156
                );
157
            }
158
159 27
            $result = new PushbackIterator($nested, IoInterface::SOCKET_BUFFER_SIZE);
160 27
            $result->rewind();
161
162 27
            $operation->setData($result);
163 27
        }
164
165 31
        return $result;
166
    }
167
}
168