Completed
Branch master (1d1574)
by Evgenij
18:38
created

LibEventRequestExecutor::onEvent()   B

Complexity

Conditions 5
Paths 8

Size

Total Lines 25
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 5

Importance

Changes 7
Bugs 1 Features 1
Metric Value
c 7
b 1
f 1
dl 0
loc 25
ccs 17
cts 17
cp 1
rs 8.439
cc 5
eloc 17
nc 8
nop 2
crap 5
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2016, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor;
11
12
use AsyncSockets\Configuration\Configuration;
13
use AsyncSockets\RequestExecutor\LibEvent\LeBase;
14
use AsyncSockets\RequestExecutor\LibEvent\LeCallbackInterface;
15
use AsyncSockets\RequestExecutor\LibEvent\LeEvent;
16
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
17
use AsyncSockets\RequestExecutor\Pipeline\EventCaller;
18
use AsyncSockets\RequestExecutor\Pipeline\PipelineStageInterface;
19
use AsyncSockets\RequestExecutor\Pipeline\StageFactoryInterface;
20
use AsyncSockets\RequestExecutor\Pipeline\TimeoutStage;
21
use AsyncSockets\RequestExecutor\Specification\ConnectionLessSocketSpecification;
22
23
/**
24
 * Class LibEventRequestExecutor
25
 */
26
class LibEventRequestExecutor extends AbstractRequestExecutor implements LeCallbackInterface
27
{
28
    /**
29
     * Libevent handle
30
     *
31
     * @var LeBase
32
     */
33
    private $base;
34
35
    /**
36
     * Connect stage
37
     *
38
     * @var PipelineStageInterface
39
     */
40
    private $connectStage;
41
42
    /**
43
     * Delay stage
44
     *
45
     * @var PipelineStageInterface
46
     */
47
    private $delayStage;
48
49
    /**
50
     * I/O stage
51
     *
52
     * @var PipelineStageInterface
53
     */
54
    private $ioStage;
55
56
    /**
57
     * Disconnect stage
58
     *
59
     * @var PipelineStageInterface
60
     */
61
    private $disconnectStage;
62
63
    /**
64
     * Stage factory
65
     *
66
     * @var StageFactoryInterface
67
     */
68
    private $stageFactory;
69
70
    /**
71
     * Timeout stage
72
     *
73
     * @var TimeoutStage
74
     */
75
    private $timeoutStage;
76
77
    /**
78
     * Array of connected sockets information indexed by RequestDescriptor
79
     *
80
     * @var bool[]
81
     */
82
    private $connectedDescriptors = [];
83
84
    /**
85
     * LibEventRequestExecutor constructor.
86
     *
87
     * @param StageFactoryInterface $stageFactory Stage factory
88
     * @param Configuration   $configuration Configuration for executor
89
     */
90 65
    public function __construct(StageFactoryInterface $stageFactory, Configuration $configuration)
91
    {
92 65
        parent::__construct($configuration);
93 65
        $this->stageFactory = $stageFactory;
94 65
    }
95
96
    /** {@inheritdoc} */
97 63
    protected function doExecuteRequest(EventCaller $eventCaller)
98
    {
99 63
        $this->connectSockets();
100 55
        $this->base->startLoop();
101 29
    }
102
103
    /** {@inheritdoc} */
104 63
    protected function initializeRequest(EventCaller $eventCaller)
105
    {
106 63
        parent::initializeRequest($eventCaller);
107 63
        $this->base        = new LeBase();
108
109 63
        $this->connectStage    = $this->stageFactory->createConnectStage($this, $eventCaller, $this->solver);
110 63
        $this->ioStage         = $this->stageFactory->createIoStage($this, $eventCaller);
111 63
        $this->disconnectStage = $this->stageFactory->createDisconnectStage($this, $eventCaller);
112 63
        $this->delayStage      = $this->stageFactory->createDelayStage($this, $eventCaller);
113 63
        $this->timeoutStage    = new TimeoutStage($this, $eventCaller);
114 63
    }
115
116
    /** {@inheritdoc} */
117 63
    protected function terminateRequest()
118
    {
119 63
        parent::terminateRequest();
120 63
        $this->base        = null;
121
122 63
        $this->connectStage    = null;
123 63
        $this->ioStage         = null;
124 63
        $this->disconnectStage = null;
125 63
        $this->timeoutStage    = null;
126 63
    }
127
128
    /** {@inheritdoc} */
129 9
    protected function disconnectItems(array $items)
130
    {
131 9
        $this->disconnectStage->processStage($items);
132 9
    }
133
134
    /**
135
     * Setup libevent for given operation
136
     *
137
     * @param RequestDescriptor $descriptor Metadata object
138
     * @param int|null          $timeout Timeout in seconds
139
     *
140
     * @return void
141
     */
142 49
    private function setupEvent(RequestDescriptor $descriptor, $timeout)
143
    {
144 49
        $specification = new ConnectionLessSocketSpecification();
145 49
        if (!$specification->isSatisfiedBy($descriptor)) {
146 48
            $this->delayStage->processStage([$descriptor]);
147 48
            $event = new LeEvent($this, $descriptor, $timeout);
148 48
            $this->base->addEvent($event);
149 48
        } else {
150 1
            if ($this->delayStage->processStage([$descriptor])) {
151 1
                $this->onEvent($descriptor, LeCallbackInterface::EVENT_READ);
152 1
            }
153
        }
154 49
    }
155
156
    /** {@inheritdoc} */
157 3
    public function stopRequest()
158
    {
159 3
        parent::stopRequest();
160 2
        $this->base->breakLoop();
161 2
    }
162
163
    /** {@inheritdoc} */
164 49
    public function onEvent(RequestDescriptor $requestDescriptor, $type)
165
    {
166 49
        $doResetEvent = false;
167
        switch ($type) {
168 49
            case LeCallbackInterface::EVENT_READ:
169
                // fall down
170 49
            case LeCallbackInterface::EVENT_WRITE:
171 43
                $result       = $this->ioStage->processStage([ $requestDescriptor ]);
172 27
                $doResetEvent = empty($result);
173
174 27
                break;
175 8
            case LeCallbackInterface::EVENT_TIMEOUT:
176 8
                $doResetEvent = $this->timeoutStage->handleTimeoutOnDescriptor($requestDescriptor);
177 6
                break;
178
        }
179
180 31
        if ($doResetEvent) {
181 2
            $meta = $requestDescriptor->getMetadata();
182 2
            $this->setupEvent($requestDescriptor, $meta[self::META_IO_TIMEOUT]);
183 2
        } else {
184 31
            $this->disconnectStage->processStage([$requestDescriptor]);
185
        }
186
187 25
        $this->connectSockets();
188 23
    }
189
190
    /**
191
     * Connect sockets to server
192
     *
193
     * @return void
194
     */
195 63
    private function connectSockets()
196
    {
197 63
        $items = $this->socketBag->getItems();
198 63
        foreach ($this->connectStage->processStage($items) as $item) {
199 49
            $this->setupEvent($item, $this->resolveTimeout($item));
200 55
        }
201 55
    }
202
203
    /**
204
     * Resolves timeout for setting up event
205
     *
206
     * @param RequestDescriptor $descriptor Descriptor object
207
     *
208
     * @return int
209
     */
210 49
    private function resolveTimeout(RequestDescriptor $descriptor)
211
    {
212 49
        $meta   = $descriptor->getMetadata();
213 49
        $key    = spl_object_hash($descriptor);
214 49
        $result = $meta[RequestExecutorInterface::META_IO_TIMEOUT];
215 49
        if (!isset($this->connectedDescriptors[$key])) {
216 49
            $result                           = $meta[RequestExecutorInterface::META_CONNECTION_TIMEOUT];
217 49
            $this->connectedDescriptors[$key] = true;
218 49
        }
219
220 49
        return $result;
221
    }
222
}
223