Completed
Branch 0.4-dev (54e67b)
by Evgenij
02:52
created

WriteIoHandler::writeDataToSocket()   B

Complexity

Conditions 5
Paths 6

Size

Total Lines 35
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 5

Importance

Changes 2
Bugs 0 Features 1
Metric Value
c 2
b 0
f 1
dl 0
loc 35
ccs 22
cts 22
cp 1
rs 8.439
cc 5
eloc 22
nc 6
nop 4
crap 5
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2016, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor\Pipeline;
11
12
use AsyncSockets\Event\WriteEvent;
13
use AsyncSockets\Operation\InProgressWriteOperation;
14
use AsyncSockets\Operation\OperationInterface;
15
use AsyncSockets\Operation\WriteOperation;
16
use AsyncSockets\RequestExecutor\EventHandlerInterface;
17
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
18
use AsyncSockets\RequestExecutor\RequestExecutorInterface;
19
use AsyncSockets\Socket\SocketInterface;
20
21
/**
22
 * Class WriteIoHandler
23
 */
24
class WriteIoHandler extends AbstractOobHandler
25
{
26
    /** {@inheritdoc} */
27 42
    public function supports(OperationInterface $operation)
28
    {
29 42
        return ($operation instanceof WriteOperation) || ($operation instanceof InProgressWriteOperation);
30
    }
31
32
    /** {@inheritdoc} */
33 46
    protected function handleOperation(
34
        RequestDescriptor $descriptor,
35
        RequestExecutorInterface $executor,
36
        EventHandlerInterface $eventHandler
37
    ) {
38 46
        $operation = $descriptor->getOperation();
39 46
        $socket    = $descriptor->getSocket();
40
41
        /** @var WriteOperation $operation */
42 46
        $fireEvent = !($operation instanceof InProgressWriteOperation);
43
44 46
        if ($fireEvent) {
45 45
            $meta  = $executor->socketBag()->getSocketMetaData($socket);
46 45
            $event = new WriteEvent(
47 45
                $operation,
48 45
                $executor,
49 45
                $socket,
50 45
                $meta[ RequestExecutorInterface::META_USER_CONTEXT ]
51 45
            );
52 45
            $eventHandler->invokeEvent($event);
53 30
            $nextOperation = $event->getNextOperation();
54 30
        } else {
55 1
            $nextOperation = $operation;
56
        }
57
58 31
        $result = $this->writeDataToSocket($operation, $socket, $nextOperation, $bytesWritten);
59 28
        $meta   = $descriptor->getMetadata();
60 28
        $descriptor->setMetadata(
61 28
            RequestExecutorInterface::META_BYTES_SENT,
62 28
            $meta[RequestExecutorInterface::META_BYTES_SENT] + $bytesWritten
63 28
        );
64
65 28
        return $result;
66
    }
67
68
    /**
69
     * Perform data writing to socket and return suitable next socket operation
70
     *
71
     * @param WriteOperation     $operation Current write operation instance
72
     * @param SocketInterface    $socket Socket object
73
     * @param OperationInterface $nextOperation Desirable next operation
74
     * @param int                $bytesWritten Amount of written bytes
75
     *
76
     * @return OperationInterface Actual next operation
77
     */
78 31
    private function writeDataToSocket(
79
        WriteOperation $operation,
80
        SocketInterface $socket,
81
        OperationInterface $nextOperation = null,
82
        &$bytesWritten = null
83
    ) {
84 31
        $result               = $nextOperation;
85 31
        $extractNextOperation = true;
86 31
        $bytesWritten         = 0;
87
88 31
        if ($operation->hasData()) {
89 9
            $data    = $operation->getData();
90 9
            $length  = strlen($data);
91 9
            $written = $socket->write($data, $operation->isOutOfBand());
92 6
            if ($length !== $written) {
93 2
                $extractNextOperation = false;
94
95 2
                $operation = $this->makeInProgressWriteOperation($operation, $nextOperation);
96 2
                $operation->setData(
97 2
                    substr($operation->getData(), $written)
98 2
                );
99
100 2
                $result = $operation;
101 2
            }
102
103 6
            $bytesWritten = $written;
104 6
        }
105
106 28
        if ($extractNextOperation && ($operation instanceof InProgressWriteOperation)) {
107
            /** @var InProgressWriteOperation $operation */
108 1
            $result = $operation->getNextOperation();
109 1
        }
110
111 28
        return $result;
112
    }
113
114
    /**
115
     * Marks write operation as in progress
116
     *
117
     * @param WriteOperation     $operation Current operation object
118
     * @param OperationInterface $nextOperation Next planned operation
119
     *
120
     * @return InProgressWriteOperation Next operation object
121
     */
122 2
    private function makeInProgressWriteOperation(WriteOperation $operation, OperationInterface $nextOperation = null)
123
    {
124 2
        $result = $operation;
125 2
        if (!($result instanceof InProgressWriteOperation)) {
126 2
            $result = new InProgressWriteOperation($nextOperation, $operation->getData());
127 2
        }
128
129 2
        return $result;
130
    }
131
}
132