Completed
Branch 0.4-dev (97f287)
by Evgenij
02:20
created

AbstractRequestExecutor   A

Complexity

Total Complexity 25

Size/Duplication

Total Lines 214
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 7

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 25
lcom 1
cbo 7
dl 0
loc 214
ccs 75
cts 75
cp 1
rs 10
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A socketBag() 0 4 1
A withEventHandler() 0 5 1
A withLimitationSolver() 0 8 2
A isExecuting() 0 4 1
A stopRequest() 0 8 2
A initializeRequest() 0 4 1
doExecuteRequest() 0 1 ?
A terminateRequest() 0 4 1
disconnectItems() 0 1 ?
A invokeEvent() 0 7 3
C executeRequest() 0 52 9
A emergencyShutdown() 0 12 3
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
namespace AsyncSockets\RequestExecutor;
11
12
use AsyncSockets\Configuration\Configuration;
13
use AsyncSockets\Event\Event;
14
use AsyncSockets\Exception\SocketException;
15
use AsyncSockets\Exception\StopRequestExecuteException;
16
use AsyncSockets\RequestExecutor\Metadata\RequestDescriptor;
17
use AsyncSockets\RequestExecutor\Metadata\SocketBag;
18
use AsyncSockets\RequestExecutor\Pipeline\EventCaller;
19
20
/**
21
 * Class AbstractRequestExecutor
22
 */
23
abstract class AbstractRequestExecutor implements RequestExecutorInterface, EventHandlerInterface
24
{
25
    /**
26
     * Decider for request limitation
27
     *
28
     * @var LimitationSolverInterface
29
     */
30
    protected $solver;
31
32
    /**
33
     * EventHandlerInterface
34
     *
35
     * @var EventHandlerInterface[]
36
     */
37
    protected $eventHandlers = [];
38
39
    /**
40
     * Socket bag
41
     *
42
     * @var SocketBag
43
     */
44
    protected $socketBag;
45
46
    /**
47
     * Flag, indicating stopping request
48
     *
49
     * @var bool
50
     */
51
    private $isRequestStopped = false;
52
53
    /**
54
     * Flag, indicating stopping request is in progress
55
     *
56
     * @var bool
57
     */
58
    private $isRequestStopInProgress = false;
59
60
    /**
61
     * Flag whether request is executing
62
     *
63
     * @var bool
64
     */
65
    private $isExecuting = false;
66
67
    /**
68
     * AbstractRequestExecutor constructor.
69
     *
70
     * @param Configuration $configuration Configuration for executor
71
     */
72 139
    public function __construct(Configuration $configuration)
73
    {
74 139
        $this->socketBag = new SocketBag($this, $configuration->getConnectTimeout(), $configuration->getIoTimeout());
75 139
    }
76
77
    /** {@inheritdoc} */
78 135
    public function socketBag()
79
    {
80 135
        return $this->socketBag;
81
    }
82
83
    /** {@inheritdoc} */
84 102
    public function withEventHandler(EventHandlerInterface $handler)
85
    {
86 102
        $key = spl_object_hash($handler);
87 102
        $this->eventHandlers[$key] = $handler;
88 102
    }
89
90
    /** {@inheritdoc} */
91 12
    public function withLimitationSolver(LimitationSolverInterface $solver)
92
    {
93 12
        if ($this->isExecuting()) {
94 4
            throw new \BadMethodCallException('Can not change limitation solver during request processing');
95
        }
96
97 8
        $this->solver = $solver;
98 8
    }
99
100
    /** {@inheritdoc} */
101 137
    public function isExecuting()
102
    {
103 137
        return $this->isExecuting;
104
    }
105
106
    /** {@inheritdoc} */
107 135
    public function executeRequest()
108
    {
109 135
        if ($this->isExecuting()) {
110 4
            throw new \BadMethodCallException('Request is already in progress');
111
        }
112
113 135
        $this->isRequestStopped = false;
114 135
        $this->solver           = $this->solver ?: new NoLimitationSolver();
115
116 135
        $this->isExecuting = true;
117
118 135
        $eventCaller = new EventCaller($this);
119
        try {
120 135
            foreach ($this->eventHandlers as $handler) {
121 102
                $eventCaller->addHandler($handler);
122 135
            }
123
124 135
            if ($this->solver instanceof EventHandlerInterface) {
125 4
                $eventCaller->addHandler($this->solver);
126 4
            }
127
128 135
            $this->initializeRequest($eventCaller);
129
130 135
            $eventCaller->addHandler($this);
131
132 135
            $this->solver->initialize($this);
133 135
            $this->doExecuteRequest($eventCaller);
134 59
            $this->solver->finalize($this);
135
136 59
            $this->terminateRequest();
137 135
        } catch (StopRequestExecuteException $e) {
138 4
            $this->isRequestStopInProgress = true;
139 4
            $this->disconnectItems($this->socketBag->getItems());
140 76
        } catch (SocketException $e) {
141 18
            foreach ($this->socketBag->getItems() as $item) {
142 18
                $eventCaller->setCurrentOperation($item);
143 18
                $eventCaller->callExceptionSubscribers($item, $e);
144 18
            }
145
146 18
            $this->disconnectItems($this->socketBag->getItems());
147 72
        } catch (\Exception $e) {
148 54
            $this->isExecuting = false;
149 54
            $this->emergencyShutdown();
150 54
            $this->solver->finalize($this);
151 54
            $this->terminateRequest();
152 54
            throw $e;
153
        }
154
155 81
        $this->solver->finalize($this);
156 81
        $this->terminateRequest();
157 81
        $this->isExecuting = false;
158 81
    }
159
160
    /** {@inheritdoc} */
161 6
    public function stopRequest()
162
    {
163 6
        if (!$this->isExecuting()) {
164 2
            throw new \BadMethodCallException('Can not stop inactive request');
165
        }
166
167 4
        $this->isRequestStopped = true;
168 4
    }
169
170
    /**
171
     * Prepare executor for request
172
     *
173
     * @param EventCaller $eventCaller Event caller
174
     *
175
     * @return void
176
     */
177 135
    protected function initializeRequest(EventCaller $eventCaller)
178
    {
179
        // empty body
180 135
    }
181
182
    /**
183
     * Execute network request
184
     *
185
     * @param EventCaller $eventCaller Event caller object
186
     *
187
     * @return void
188
     */
189
    abstract protected function doExecuteRequest(EventCaller $eventCaller);
190
191
    /**
192
     * Terminate request in executor
193
     *
194
     * @return void
195
     */
196 135
    protected function terminateRequest()
197
    {
198
        // empty body
199 135
    }
200
201
    /**
202
     * Disconnect given sockets
203
     *
204
     * @param RequestDescriptor[] $items Sockets' operations to disconnect
205
     *
206
     * @return mixed
207
     */
208
    abstract protected function disconnectItems(array $items);
209
210
    /**
211
     * Shutdown all sockets in case of unhandled exception
212
     *
213
     * @return void
214
     */
215 54
    private function emergencyShutdown()
216
    {
217 54
        foreach ($this->socketBag->getItems() as $item) {
218
            try {
219 54
                $item->getSocket()->close();
220 54
            } catch (\Exception $e) {
221
                // nothing required
222
            }
223
224 54
            $item->setMetadata(self::META_REQUEST_COMPLETE, true);
225 54
        }
226 54
    }
227
228
    /** {@inheritdoc} */
229 126
    public function invokeEvent(Event $event)
230
    {
231 126
        if ($this->isRequestStopped && !$this->isRequestStopInProgress) {
232 4
            $this->isRequestStopInProgress = true;
233 4
            throw new StopRequestExecuteException();
234
        }
235 126
    }
236
}
237