GuardianStage::handleDeadConnection()   B
last analyzed

Complexity

Conditions 4
Paths 5

Size

Total Lines 28
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 20
CRAP Score 4

Importance

Changes 0
Metric Value
dl 0
loc 28
ccs 20
cts 20
cp 1
rs 8.5806
c 0
b 0
f 0
cc 4
eloc 18
nc 5
nop 1
crap 4
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor\Pipeline;
11
12
use AsyncSockets\Event\DataAlertEvent;
13
use AsyncSockets\Exception\UnmanagedSocketException;
14
use AsyncSockets\Frame\EmptyFramePicker;
15
use AsyncSockets\Operation\NullOperation;
16
use AsyncSockets\Operation\ReadOperation;
17
use AsyncSockets\RequestExecutor\ExecutionContext;
18
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
19
use AsyncSockets\RequestExecutor\RequestExecutorInterface;
20
21
/**
22
 * Class GuardianStage
23
 */
24
class GuardianStage extends AbstractStage
25
{
26
    /**
27
     * Maximum socket attempts to execute
28
     */
29
    const MAX_ATTEMPTS_PER_SOCKET = 25;
30
    
31
    /**
32
     * Indexed by RequestDescriptor object hash array of attempts, after which we should kill connection
33
     *
34
     * @var int[]
35
     */
36
    private $candidates = [];
37
38
    /**
39
     * Disconnect stage
40
     *
41
     * @var DisconnectStage
42
     */
43
    private $disconnectStage;
44
45
    /**
46
     * GuardianStage constructor.
47
     *
48
     * @param RequestExecutorInterface $executor         Request executor
49
     * @param EventCaller              $eventCaller      Event caller
50
     * @param ExecutionContext         $executionContext Execution context
51
     * @param DisconnectStage          $disconnectStage  Disconnect stage
52
     */
53 140
    public function __construct(
54
        RequestExecutorInterface $executor,
55
        EventCaller $eventCaller,
56
        ExecutionContext $executionContext,
57
        DisconnectStage $disconnectStage
58
    ) {
59 140
        parent::__construct($executor, $eventCaller, $executionContext);
60 140
        $this->disconnectStage = $disconnectStage;
61 140
    }
62
63
    /** {@inheritdoc} */
64 80
    public function processStage(array $requestDescriptors)
65
    {
66 80
        $result = [];
67 80
        foreach ($requestDescriptors as $key => $descriptor) {
68 78
            if (!$this->handleDeadConnection($descriptor)) {
69 78
                $result[$key] = $descriptor;
70 78
            }
71 80
        }
72
73 80
        return $result;
74
    }
75
76
    /**
77
     * Check if this request is alive and managed by client code
78
     *
79
     * @param RequestDescriptor $descriptor Object to test
80
     *
81
     * @return bool True if connection killed, false otherwise
82
     */
83 78
    private function handleDeadConnection(RequestDescriptor $descriptor)
84
    {
85 78
        $key = spl_object_hash($descriptor);
86 78
        if (!$this->isZombieCandidate($descriptor)) {
87 74
            unset($this->candidates[$key]);
88 74
            return false;
89
        }
90
91 4
        if (!isset($this->candidates[$key])) {
92 4
            $this->candidates[$key] = self::MAX_ATTEMPTS_PER_SOCKET;
93 4
        }
94
95 4
        $this->notifyDataAlert(
96 4
            $descriptor,
97 4
            self::MAX_ATTEMPTS_PER_SOCKET - $this->candidates[$key] + 1,
98
            self::MAX_ATTEMPTS_PER_SOCKET
99 4
        );
100
101 4
        --$this->candidates[$key];
102 4
        $result = false;
103 4
        if (!$this->candidates[$key]) {
104 2
            $result = true;
105 2
            $this->killZombieConnection($descriptor);
106 2
            unset($this->candidates[$key]);
107 2
        }
108
109 4
        return $result;
110
    }
111
112
    /**
113
     * Check if this socket can be a zombie
114
     *
115
     * @param RequestDescriptor $descriptor Descriptor object
116
     *
117
     * @return bool
118
     */
119 78
    private function isZombieCandidate(RequestDescriptor $descriptor)
120
    {
121 78
        $metadata  = $descriptor->getMetadata();
122 78
        if ($metadata[RequestExecutorInterface::META_REQUEST_COMPLETE]) {
123 73
            return false;
124
        }
125
126 5
        $operation = $descriptor->getOperation();
127 5
        return ($operation instanceof NullOperation) ||
128
               (
129 3
                   $operation instanceof ReadOperation &&
130 2
                   $operation->getFramePicker() instanceof EmptyFramePicker
131 5
               );
132
    }
133
134
    /**
135
     * Closes connection, as we suppose it is unmanaged
136
     *
137
     * @param RequestDescriptor $descriptor Descriptor object to kill
138
     *
139
     * @return void
140
     */
141 2
    private function killZombieConnection(RequestDescriptor $descriptor)
142
    {
143 2
        $this->eventCaller->callExceptionSubscribers(
144 2
            $descriptor,
145 2
            UnmanagedSocketException::zombieSocketDetected($descriptor->getSocket()),
146 2
            $this->executor,
147 2
            $this->executionContext
148 2
        );
149
150 2
        $this->disconnectStage->disconnect($descriptor);
151 2
    }
152
153
    /**
154
     * Notify client about unhandled data in socket
155
     *
156
     * @param RequestDescriptor $descriptor Socket operation descriptor
157
     * @param int               $attempt Current attempt number from 1
158
     * @param int               $totalAttempts Total attempts
159
     *
160
     * @return void
161
     */
162 4
    private function notifyDataAlert(
163
        RequestDescriptor $descriptor,
164
        $attempt,
165
        $totalAttempts
166
    ) {
167 4
        $socket = $descriptor->getSocket();
168 4
        $meta   = $this->executor->socketBag()->getSocketMetaData($socket);
169 4
        $event  = new DataAlertEvent(
170 4
            $this->executor,
171 4
            $socket,
172 4
            $meta[ RequestExecutorInterface::META_USER_CONTEXT ],
173 4
            $attempt,
174
            $totalAttempts
175 4
        );
176
        
177 4
        $this->callSocketSubscribers($descriptor, $event);
178
179 4
        $operation = $event->getNextOperation();
180 4
        if ($operation) {
181 2
            $descriptor->setOperation($operation);
182 2
        }
183 4
    }
184
}
185