Completed
Branch 0.4-dev (d25c80)
by Evgenij
03:42
created

WriteIoHandler   A

Complexity

Total Complexity 19

Size/Duplication

Total Lines 144
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 7

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 19
lcom 1
cbo 7
dl 0
loc 144
ccs 73
cts 73
cp 1
rs 10
c 0
b 0
f 0

5 Methods

Rating   Name   Duplication   Size   Complexity  
A supports() 0 4 2
B handleOperation() 0 31 2
B writeDataToSocket() 0 36 6
A makeInProgressWriteOperation() 0 9 2
C resolveDataIterator() 0 30 7
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor\Pipeline;
11
12
use AsyncSockets\Event\WriteEvent;
13
use AsyncSockets\Operation\InProgressWriteOperation;
14
use AsyncSockets\Operation\OperationInterface;
15
use AsyncSockets\Operation\WriteOperation;
16
use AsyncSockets\RequestExecutor\EventHandlerInterface;
17
use AsyncSockets\RequestExecutor\ExecutionContext;
18
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
19
use AsyncSockets\RequestExecutor\RequestExecutorInterface;
20
use AsyncSockets\Socket\Io\IoInterface;
21
use AsyncSockets\Socket\SocketInterface;
22
23
/**
24
 * Class WriteIoHandler
25
 */
26
class WriteIoHandler extends AbstractOobHandler
27
{
28
    /** {@inheritdoc} */
29 42
    public function supports(OperationInterface $operation)
30
    {
31 42
        return ($operation instanceof WriteOperation) || ($operation instanceof InProgressWriteOperation);
32
    }
33
34
    /** {@inheritdoc} */
35 47
    protected function handleOperation(
36
        RequestDescriptor $descriptor,
37
        RequestExecutorInterface $executor,
38
        EventHandlerInterface $eventHandler,
39
        ExecutionContext $executionContext
40
    ) {
41 47
        $operation = $descriptor->getOperation();
42 47
        $socket    = $descriptor->getSocket();
43
44
        /** @var WriteOperation $operation */
45 47
        $fireEvent = !($operation instanceof InProgressWriteOperation);
46
47 47
        if ($fireEvent) {
48 46
            $meta  = $executor->socketBag()->getSocketMetaData($socket);
49 46
            $event = new WriteEvent(
50 46
                $operation,
51 46
                $executor,
52 46
                $socket,
53 46
                $meta[ RequestExecutorInterface::META_USER_CONTEXT ]
54 46
            );
55 46
            $eventHandler->invokeEvent($event, $executor, $descriptor->getSocket(), $executionContext);
56 31
            $nextOperation = $event->getNextOperation();
57 31
        } else {
58 1
            $nextOperation = $operation;
59
        }
60
61 32
        $result = $this->writeDataToSocket($operation, $socket, $nextOperation, $bytesWritten);
62 28
        $this->handleTransferCounter(RequestDescriptor::COUNTER_SEND_MIN_RATE, $descriptor, $bytesWritten);
63
64 28
        return $result;
65
    }
66
67
    /**
68
     * Perform data writing to socket and return suitable next socket operation
69
     *
70
     * @param WriteOperation     $operation Current write operation instance
71
     * @param SocketInterface    $socket Socket object
72
     * @param OperationInterface $nextOperation Desirable next operation
73
     * @param int                $bytesWritten Amount of written bytes
74
     *
75
     * @return OperationInterface Actual next operation
76
     */
77 32
    private function writeDataToSocket(
78
        WriteOperation $operation,
79
        SocketInterface $socket,
80
        OperationInterface $nextOperation = null,
81
        &$bytesWritten = null
82
    ) {
83 32
        $result               = $nextOperation;
84 32
        $extractNextOperation = true;
85 32
        $bytesWritten         = 0;
86
87 32
        $iterator = $this->resolveDataIterator($operation);
88 31
        if ($iterator->valid()) {
89 9
            $data     = $iterator->current();
90 9
            $length   = strlen($data);
91 9
            $written  = $socket->write($data, $operation->isOutOfBand());
92 6
            if ($length !== $written) {
93 2
                $iterator->unread($length - $written);
94 2
            }
95
96 6
            $bytesWritten = $written;
97 6
            $iterator->next();
98
99 6
            if ($iterator->valid()) {
100 2
                $extractNextOperation = false;
101 2
                $operation = $this->makeInProgressWriteOperation($operation, $nextOperation);
102 2
                $result    = $operation;
103 2
            }
104 6
        }
105
106 28
        if ($extractNextOperation && ($operation instanceof InProgressWriteOperation)) {
107
            /** @var InProgressWriteOperation $operation */
108 1
            $result = $operation->getNextOperation();
109 1
        }
110
111 28
        return $result;
112
    }
113
114
    /**
115
     * Marks write operation as in progress
116
     *
117
     * @param WriteOperation     $operation Current operation object
118
     * @param OperationInterface $nextOperation Next planned operation
119
     *
120
     * @return InProgressWriteOperation Next operation object
121
     */
122 2
    private function makeInProgressWriteOperation(WriteOperation $operation, OperationInterface $nextOperation = null)
123
    {
124 2
        $result = $operation;
125 2
        if (!($result instanceof InProgressWriteOperation)) {
126 2
            $result = new InProgressWriteOperation($nextOperation, $operation->getData());
127 2
        }
128
129 2
        return $result;
130
    }
131
132
    /**
133
     * Return iterator for writing operation
134
     *
135
     * @param WriteOperation $operation The operation
136
     *
137
     * @return PushbackIterator
138
     */
139 32
    private function resolveDataIterator(WriteOperation $operation)
140
    {
141 32
        $result = $operation->getData();
142 32
        if (!($result instanceof PushbackIterator)) {
143 28
            $nested = $result;
144 28
            if ($nested === null || is_scalar($nested)) {
145 27
                $nested = (array) $nested;
146 27
            }
147
148 28
            if (is_array($nested)) {
149 27
                $nested = new \ArrayIterator($nested);
150 27
            }
151
152 28
            if (!($nested instanceof \Traversable)) {
153 1
                throw new \LogicException(
154 1
                    sprintf(
155 1
                        'Trying to send unexpected data type %s',
156 1
                        is_object($nested) ? get_class($nested) : gettype($nested)
157 1
                    )
158 1
                );
159
            }
160
161 27
            $result = new PushbackIterator($nested, IoInterface::SOCKET_BUFFER_SIZE);
162 27
            $result->rewind();
163
164 27
            $operation->setData($result);
165 27
        }
166
167 31
        return $result;
168
    }
169
}
170