LibEventRequestExecutor::stopRequest()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 5
ccs 4
cts 4
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 0
crap 1
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor;
11
12
use AsyncSockets\Configuration\Configuration;
13
use AsyncSockets\RequestExecutor\LibEvent\LeBase;
14
use AsyncSockets\RequestExecutor\LibEvent\LeCallbackInterface;
15
use AsyncSockets\RequestExecutor\LibEvent\LeEvent;
16
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
17
use AsyncSockets\RequestExecutor\Pipeline\EventCaller;
18
use AsyncSockets\RequestExecutor\Pipeline\PipelineStageInterface;
19
use AsyncSockets\RequestExecutor\Pipeline\StageFactoryInterface;
20
use AsyncSockets\RequestExecutor\Pipeline\TimeoutStage;
21
use AsyncSockets\RequestExecutor\Specification\ConnectionLessSocketSpecification;
22
23
/**
24
 * Class LibEventRequestExecutor
25
 */
26
class LibEventRequestExecutor extends AbstractRequestExecutor implements LeCallbackInterface
27
{
28
    /**
29
     * Libevent handle
30
     *
31
     * @var LeBase
32
     */
33
    private $base;
34
35
    /**
36
     * Connect stage
37
     *
38
     * @var PipelineStageInterface
39
     */
40
    private $connectStage;
41
42
    /**
43
     * Delay stage
44
     *
45
     * @var PipelineStageInterface
46
     */
47
    private $delayStage;
48
49
    /**
50
     * I/O stage
51
     *
52
     * @var PipelineStageInterface
53
     */
54
    private $ioStage;
55
56
    /**
57
     * Disconnect stage
58
     *
59
     * @var PipelineStageInterface
60
     */
61
    private $disconnectStage;
62
63
    /**
64
     * Stage factory
65
     *
66
     * @var StageFactoryInterface
67
     */
68
    private $stageFactory;
69
70
    /**
71
     * Timeout stage
72
     *
73
     * @var TimeoutStage
74
     */
75
    private $timeoutStage;
76
77
    /**
78
     * Array of connected sockets information indexed by RequestDescriptor
79
     *
80
     * @var bool[]
81
     */
82
    private $connectedDescriptors = [];
83
84
    /**
85
     * LibEventRequestExecutor constructor.
86
     *
87
     * @param StageFactoryInterface $stageFactory Stage factory
88
     * @param Configuration   $configuration Configuration for executor
89
     */
90 70
    public function __construct(StageFactoryInterface $stageFactory, Configuration $configuration)
91
    {
92 70
        parent::__construct($configuration);
93 70
        $this->stageFactory = $stageFactory;
94 70
    }
95
96
    /** {@inheritdoc} */
97 68
    protected function doExecuteRequest(EventCaller $eventCaller, ExecutionContext $executionContext)
98
    {
99 68
        $this->connectSockets();
100 60
        $this->base->startLoop();
101 30
    }
102
103
    /** {@inheritdoc} */
104 68
    protected function initializeRequest(EventCaller $eventCaller, ExecutionContext $executionContext)
105
    {
106 68
        parent::initializeRequest($eventCaller, $executionContext);
107 68
        $this->base = new LeBase();
108
109 68
        $this->connectStage    = $this->stageFactory->createConnectStage(
110 68
            $this,
111 68
            $executionContext,
112 68
            $eventCaller,
113 68
            $this->solver
114 68
        );
115 68
        $this->ioStage         = $this->stageFactory->createIoStage($this, $executionContext, $eventCaller);
116 68
        $this->disconnectStage = $this->stageFactory->createDisconnectStage($this, $executionContext, $eventCaller);
117 68
        $this->delayStage      = $this->stageFactory->createDelayStage($this, $executionContext, $eventCaller);
118 68
        $this->timeoutStage    = new TimeoutStage($this, $eventCaller, $executionContext);
119 68
    }
120
121
    /** {@inheritdoc} */
122 68
    protected function terminateRequest(ExecutionContext $executionContext)
123
    {
124 68
        parent::terminateRequest($executionContext);
125 68
        $this->base = null;
126
127 68
        $this->connectStage    = null;
128 68
        $this->ioStage         = null;
129 68
        $this->disconnectStage = null;
130 68
        $this->timeoutStage    = null;
131 68
    }
132
133
    /** {@inheritdoc} */
134 11
    protected function disconnectItems(array $items)
135
    {
136 11
        $this->disconnectStage->processStage($items);
137 11
    }
138
139
    /** {@inheritdoc} */
140 3
    public function stopRequest()
141
    {
142 3
        parent::stopRequest();
143 2
        $this->base->breakLoop();
144 2
    }
145
146
    /**
147
     * Connect sockets to server
148
     *
149
     * @return void
150
     */
151 68
    private function connectSockets()
152
    {
153 68
        $items = $this->socketBag->getItems();
154 68
        foreach ($this->connectStage->processStage($items) as $item) {
155 54
            $this->setupEvent($item, $this->resolveTimeout($item));
156 60
        }
157 60
    }
158
159
    /**
160
     * Setup libevent for given operation
161
     *
162
     * @param RequestDescriptor $descriptor Metadata object
163
     * @param int|null          $timeout Timeout in seconds
164
     *
165
     * @return void
166
     */
167 54
    private function setupEvent(RequestDescriptor $descriptor, $timeout)
168
    {
169 54
        $specification = new ConnectionLessSocketSpecification();
170 54
        if (!$specification->isSatisfiedBy($descriptor)) {
171 53
            $this->delayStage->processStage([$descriptor]);
172 53
            $event = new LeEvent($this, $descriptor, $timeout);
173 53
            $this->base->addEvent($event);
174 53
        } else {
175 1
            if ($this->delayStage->processStage([$descriptor])) {
176 1
                $this->onEvent($descriptor, LeCallbackInterface::EVENT_READ);
177 1
            }
178
        }
179 54
    }
180
181
    /** {@inheritdoc} */
182 54
    public function onEvent(RequestDescriptor $requestDescriptor, $type)
183
    {
184 54
        $doResetEvent = false;
185
        switch ($type) {
186 54
            case LeCallbackInterface::EVENT_READ:
187
                // fall down
188 54
            case LeCallbackInterface::EVENT_WRITE:
189 48
                $result       = $this->ioStage->processStage([ $requestDescriptor ]);
190 28
                $doResetEvent = empty($result);
191
192 28
                break;
193 8
            case LeCallbackInterface::EVENT_TIMEOUT:
194 8
                $doResetEvent = $this->timeoutStage->handleTimeoutOnDescriptor($requestDescriptor);
195 6
                break;
196
        }
197
198 32
        if ($doResetEvent) {
199 2
            $meta = $requestDescriptor->getMetadata();
200 2
            $this->setupEvent($requestDescriptor, $meta[self::META_IO_TIMEOUT]);
201 2
        } else {
202 32
            $this->disconnectStage->processStage([$requestDescriptor]);
203
        }
204
205 26
        $this->connectSockets();
206 24
    }
207
208
    /**
209
     * Resolves timeout for setting up event
210
     *
211
     * @param RequestDescriptor $descriptor Descriptor object
212
     *
213
     * @return int
214
     */
215 54
    private function resolveTimeout(RequestDescriptor $descriptor)
216
    {
217 54
        $meta   = $descriptor->getMetadata();
218 54
        $key    = spl_object_hash($descriptor);
219 54
        $result = $meta[RequestExecutorInterface::META_IO_TIMEOUT];
220 54
        if (!isset($this->connectedDescriptors[$key])) {
221 54
            $result                           = $meta[RequestExecutorInterface::META_CONNECTION_TIMEOUT];
222 54
            $this->connectedDescriptors[$key] = true;
223 54
        }
224
225 54
        return $result;
226
    }
227
}
228