Completed
Push — master ( 90e9c9...31f528 )
by Sergey
01:42
created

Client::hasPendingRequests()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 0
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace seregazhuk\React\Memcached;
4
5
use Evenement\EventEmitter;
6
use React\Promise\Promise;
7
use React\Promise\PromiseInterface;
8
use seregazhuk\React\Memcached\Exception\ConnectionClosedException;
9
use seregazhuk\React\Memcached\Exception\Exception;
10
use seregazhuk\React\Memcached\Exception\FailedCommandException;
11
use seregazhuk\React\Memcached\Exception\WrongCommandException;
12
use seregazhuk\React\Memcached\Protocol\Parser;
13
14
/**
15
 * @method PromiseInterface set(string $key, mixed $value, int $flag = 0, int $exp = 0)
16
 * @method PromiseInterface version()
17
 * @method PromiseInterface verbosity(int $level)
18
 * @method PromiseInterface flushAll()
19
 * @method PromiseInterface get($key)
20
 * @method PromiseInterface delete($key)
21
 * @method PromiseInterface replace($key, $value)
22
 * @method PromiseInterface incr($key, $value)
23
 * @method PromiseInterface decr($key, $value)
24
 * @method PromiseInterface stats()
25
 * @method PromiseInterface touch($key, $exp)
26
 * @method PromiseInterface add($key, $value)
27
 */
28
class Client extends EventEmitter
29
{
30
    /**
31
     * @var Parser
32
     */
33
    protected $parser;
34
35
    /**
36
     * @var Request[]
37
     */
38
    protected $requests = [];
39
40
    /**
41
     * Indicates that the connection is closed.
42
     *
43
     * @var bool
44
     */
45
    protected $isClosed = false;
46
47
    /**
48
     * Indicates that we don't accept new requests but we are still waiting for
49
     * pending requests to be resolved.
50
     *
51
     * @var bool
52
     */
53
    protected $isEnding = false;
54
55
    /**
56
     * @var Connection
57
     */
58
    protected $connection;
59
60
    /**
61
     * @param Connection $connection
62
     * @param Parser $parser
63
     */
64
    public function __construct(Connection $connection, Parser $parser)
65
    {
66
        $this->parser = $parser;
67
        $this->connection = $connection;
68
69
        $this->setConnectionHandlers();
70
    }
71
72
    protected function setConnectionHandlers()
73
    {
74
        $this->connection->on('data', function ($chunk) {
75
            $parsed = $this->parser->parseRawResponse($chunk);
76
            $this->resolveRequests($parsed);
77
        });
78
79
        $this->connection->on('failed', function() {
80
            $this->rejectPendingRequestsWith(new ConnectionClosedException());
81
        });
82
83
        $this->connection->on('close', function () {
84
            if (!$this->isEnding) {
85
                $this->emit('error', [new ConnectionClosedException()]);
86
            }
87
        });
88
    }
89
90
    /**
91
     * @param string $name
92
     * @param array $args
93
     * @return Promise|PromiseInterface
94
     */
95
    public function __call($name, $args)
96
    {
97
        $request = new Request($name);
98
99
        if($this->isEnding) {
100
            $request->reject(new ConnectionClosedException());
101
        } else {
102
            try {
103
                $query = $this->parser->makeCommand($name, $args);
104
                $this->connection->write($query);
105
                $this->requests[] = $request;
106
            } catch (WrongCommandException $e) {
107
                $request->reject($e);
108
            }
109
        }
110
111
        return $request->getPromise();
112
    }
113
114
    /**
115
     * @param array $responses
116
     * @throws Exception
117
     */
118
    public function resolveRequests(array $responses)
119
    {
120
        if (!$this->hasPendingRequests()) {
121
            throw new Exception('Received unexpected response, no matching request found');
122
        }
123
124
        foreach ($responses as $response) {
125
            /* @var $request Request */
126
            $request = array_shift($this->requests);
127
128
            try {
129
                $parsedResponse = $this->parser->parseResponse($request->getCommand(), $response);
130
                $request->resolve($parsedResponse);
131
            } catch (FailedCommandException $exception) {
132
                $request->reject($exception);
133
            }
134
        }
135
136
        if ($this->isEnding && !$this->hasPendingRequests()) {
137
            $this->close();
138
        }
139
    }
140
141
    /**
142
     * Closes the connection when all requests are resolved
143
     */
144
    public function end()
145
    {
146
        $this->isEnding = true;
147
148
        if (!$this->hasPendingRequests()) {
149
            $this->close();
150
        }
151
    }
152
153
    /**
154
     * Forces closing the connection and rejects all pending requests
155
     */
156
    public function close()
157
    {
158
        if ($this->isClosed) {
159
            return;
160
        }
161
162
        $this->isEnding = true;
163
        $this->isClosed = true;
164
165
        $this->connection->close();
166
        $this->emit('close');
167
168
        $this->rejectPendingRequestsWith(new ConnectionClosedException());
169
    }
170
171
    /**
172
     * @param Exception $exception
173
     */
174
    protected function rejectPendingRequestsWith(Exception $exception)
175
    {
176
        while($this->hasPendingRequests()) {
177
            $request = array_shift($this->requests);
178
            /* @var $request Request */
179
            $request->reject($exception);
180
        }
181
    }
182
183
    /**
184
     * @return bool
185
     */
186
    protected function hasPendingRequests()
187
    {
188
        return !empty($this->requests);
189
    }
190
}
191