Completed
Branch 0.4-dev (97f287)
by Evgenij
02:20
created

LibEventRequestExecutor   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 197
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 9

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 18
lcom 1
cbo 9
dl 0
loc 197
ccs 75
cts 75
cp 1
rs 10
c 0
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 1
A doExecuteRequest() 0 5 1
A initializeRequest() 0 11 1
A terminateRequest() 0 10 1
A disconnectItems() 0 4 1
A stopRequest() 0 5 1
A connectSockets() 0 7 2
A setupEvent() 0 13 3
B onEvent() 0 25 5
A resolveTimeout() 0 12 2
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor;
11
12
use AsyncSockets\Configuration\Configuration;
13
use AsyncSockets\RequestExecutor\LibEvent\LeBase;
14
use AsyncSockets\RequestExecutor\LibEvent\LeCallbackInterface;
15
use AsyncSockets\RequestExecutor\LibEvent\LeEvent;
16
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
17
use AsyncSockets\RequestExecutor\Pipeline\EventCaller;
18
use AsyncSockets\RequestExecutor\Pipeline\PipelineStageInterface;
19
use AsyncSockets\RequestExecutor\Pipeline\StageFactoryInterface;
20
use AsyncSockets\RequestExecutor\Pipeline\TimeoutStage;
21
use AsyncSockets\RequestExecutor\Specification\ConnectionLessSocketSpecification;
22
23
/**
24
 * Class LibEventRequestExecutor
25
 */
26
class LibEventRequestExecutor extends AbstractRequestExecutor implements LeCallbackInterface
27
{
28
    /**
29
     * Libevent handle
30
     *
31
     * @var LeBase
32
     */
33
    private $base;
34
35
    /**
36
     * Connect stage
37
     *
38
     * @var PipelineStageInterface
39
     */
40
    private $connectStage;
41
42
    /**
43
     * Delay stage
44
     *
45
     * @var PipelineStageInterface
46
     */
47
    private $delayStage;
48
49
    /**
50
     * I/O stage
51
     *
52
     * @var PipelineStageInterface
53
     */
54
    private $ioStage;
55
56
    /**
57
     * Disconnect stage
58
     *
59
     * @var PipelineStageInterface
60
     */
61
    private $disconnectStage;
62
63
    /**
64
     * Stage factory
65
     *
66
     * @var StageFactoryInterface
67
     */
68
    private $stageFactory;
69
70
    /**
71
     * Timeout stage
72
     *
73
     * @var TimeoutStage
74
     */
75
    private $timeoutStage;
76
77
    /**
78
     * Array of connected sockets information indexed by RequestDescriptor
79
     *
80
     * @var bool[]
81
     */
82
    private $connectedDescriptors = [];
83
84
    /**
85
     * LibEventRequestExecutor constructor.
86
     *
87
     * @param StageFactoryInterface $stageFactory Stage factory
88
     * @param Configuration   $configuration Configuration for executor
89
     */
90 70
    public function __construct(StageFactoryInterface $stageFactory, Configuration $configuration)
91
    {
92 70
        parent::__construct($configuration);
93 70
        $this->stageFactory = $stageFactory;
94 70
    }
95
96
    /** {@inheritdoc} */
97 68
    protected function doExecuteRequest(EventCaller $eventCaller)
98
    {
99 68
        $this->connectSockets();
100 60
        $this->base->startLoop();
101 30
    }
102
103
    /** {@inheritdoc} */
104 68
    protected function initializeRequest(EventCaller $eventCaller)
105
    {
106 68
        parent::initializeRequest($eventCaller);
107 68
        $this->base = new LeBase();
108
109 68
        $this->connectStage    = $this->stageFactory->createConnectStage($this, $eventCaller, $this->solver);
110 68
        $this->ioStage         = $this->stageFactory->createIoStage($this, $eventCaller);
111 68
        $this->disconnectStage = $this->stageFactory->createDisconnectStage($this, $eventCaller);
112 68
        $this->delayStage      = $this->stageFactory->createDelayStage($this, $eventCaller);
113 68
        $this->timeoutStage    = new TimeoutStage($this, $eventCaller);
114 68
    }
115
116
    /** {@inheritdoc} */
117 68
    protected function terminateRequest()
118
    {
119 68
        parent::terminateRequest();
120 68
        $this->base        = null;
121
122 68
        $this->connectStage    = null;
123 68
        $this->ioStage         = null;
124 68
        $this->disconnectStage = null;
125 68
        $this->timeoutStage    = null;
126 68
    }
127
128
    /** {@inheritdoc} */
129 11
    protected function disconnectItems(array $items)
130
    {
131 11
        $this->disconnectStage->processStage($items);
132 11
    }
133
134
    /** {@inheritdoc} */
135 3
    public function stopRequest()
136
    {
137 3
        parent::stopRequest();
138 2
        $this->base->breakLoop();
139 2
    }
140
141
    /**
142
     * Connect sockets to server
143
     *
144
     * @return void
145
     */
146 68
    private function connectSockets()
147
    {
148 68
        $items = $this->socketBag->getItems();
149 68
        foreach ($this->connectStage->processStage($items) as $item) {
150 54
            $this->setupEvent($item, $this->resolveTimeout($item));
151 60
        }
152 60
    }
153
154
    /**
155
     * Setup libevent for given operation
156
     *
157
     * @param RequestDescriptor $descriptor Metadata object
158
     * @param int|null          $timeout Timeout in seconds
159
     *
160
     * @return void
161
     */
162 54
    private function setupEvent(RequestDescriptor $descriptor, $timeout)
163
    {
164 54
        $specification = new ConnectionLessSocketSpecification();
165 54
        if (!$specification->isSatisfiedBy($descriptor)) {
166 53
            $this->delayStage->processStage([$descriptor]);
167 53
            $event = new LeEvent($this, $descriptor, $timeout);
168 53
            $this->base->addEvent($event);
169 53
        } else {
170 1
            if ($this->delayStage->processStage([$descriptor])) {
171 1
                $this->onEvent($descriptor, LeCallbackInterface::EVENT_READ);
172 1
            }
173
        }
174 54
    }
175
176
    /** {@inheritdoc} */
177 54
    public function onEvent(RequestDescriptor $requestDescriptor, $type)
178
    {
179 54
        $doResetEvent = false;
180
        switch ($type) {
181 54
            case LeCallbackInterface::EVENT_READ:
182
                // fall down
183 54
            case LeCallbackInterface::EVENT_WRITE:
184 48
                $result       = $this->ioStage->processStage([ $requestDescriptor ]);
185 28
                $doResetEvent = empty($result);
186
187 28
                break;
188 8
            case LeCallbackInterface::EVENT_TIMEOUT:
189 8
                $doResetEvent = $this->timeoutStage->handleTimeoutOnDescriptor($requestDescriptor);
190 6
                break;
191
        }
192
193 32
        if ($doResetEvent) {
194 2
            $meta = $requestDescriptor->getMetadata();
195 2
            $this->setupEvent($requestDescriptor, $meta[self::META_IO_TIMEOUT]);
196 2
        } else {
197 32
            $this->disconnectStage->processStage([$requestDescriptor]);
198
        }
199
200 26
        $this->connectSockets();
201 24
    }
202
203
    /**
204
     * Resolves timeout for setting up event
205
     *
206
     * @param RequestDescriptor $descriptor Descriptor object
207
     *
208
     * @return int
209
     */
210 54
    private function resolveTimeout(RequestDescriptor $descriptor)
211
    {
212 54
        $meta   = $descriptor->getMetadata();
213 54
        $key    = spl_object_hash($descriptor);
214 54
        $result = $meta[RequestExecutorInterface::META_IO_TIMEOUT];
215 54
        if (!isset($this->connectedDescriptors[$key])) {
216 54
            $result                           = $meta[RequestExecutorInterface::META_CONNECTION_TIMEOUT];
217 54
            $this->connectedDescriptors[$key] = true;
218 54
        }
219
220 54
        return $result;
221
    }
222
}
223