Client   A
last analyzed

Complexity

Total Complexity 16

Size/Duplication

Total Lines 141
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 50
dl 0
loc 141
rs 10
c 0
b 0
f 0
wmc 16

6 Methods

Rating   Name   Duplication   Size   Complexity  
A end() 0 6 2
A resolveRequests() 0 19 6
A __call() 0 17 3
A __construct() 0 7 1
A setConnectionHandlers() 0 14 2
A close() 0 13 2
1
<?php
2
3
namespace seregazhuk\React\Memcached;
4
5
use Evenement\EventEmitter;
6
use React\Promise\Promise;
7
use React\Promise\PromiseInterface;
8
use seregazhuk\React\Memcached\Connection\Connection;
9
use seregazhuk\React\Memcached\Exception\ConnectionClosedException;
10
use seregazhuk\React\Memcached\Exception\Exception;
11
use seregazhuk\React\Memcached\Exception\WrongCommandException;
12
use seregazhuk\React\Memcached\Protocol\Parser;
13
use seregazhuk\React\Memcached\Request\Request;
14
use seregazhuk\React\Memcached\Request\RequestsPool;
15
16
/**
17
 * @method PromiseInterface set(string $key, mixed $value, int $flag = 0, int $exp = 0)
18
 * @method PromiseInterface version()
19
 * @method PromiseInterface verbosity(int $level)
20
 * @method PromiseInterface flushAll()
21
 * @method PromiseInterface get($key)
22
 * @method PromiseInterface delete($key)
23
 * @method PromiseInterface replace($key, $value)
24
 * @method PromiseInterface incr($key, $value)
25
 * @method PromiseInterface decr($key, $value)
26
 * @method PromiseInterface stats()
27
 * @method PromiseInterface touch($key, $exp)
28
 * @method PromiseInterface add($key, $value)
29
 */
30
class Client extends EventEmitter
31
{
32
    /**
33
     * @var Parser
34
     */
35
    private $parser;
36
37
    /**
38
     * @var RequestsPool
39
     */
40
    private $pool;
41
42
    /**
43
     * Indicates that the connection is closed.
44
     *
45
     * @var bool
46
     */
47
    private $isClosed = false;
48
49
    /**
50
     * Indicates that we don't accept new requests but we are still waiting for
51
     * pending requests to be resolved.
52
     *
53
     * @var bool
54
     */
55
    private $isEnding = false;
56
57
    /**
58
     * @var Connection
59
     */
60
    private $connection;
61
62
    /**
63
     * @param Connection $connection
64
     * @param Parser $parser
65
     */
66
    public function __construct(Connection $connection, Parser $parser)
67
    {
68
        $this->parser = $parser;
69
        $this->connection = $connection;
70
        $this->pool = new RequestsPool();
71
72
        $this->setConnectionHandlers();
73
    }
74
75
    protected function setConnectionHandlers(): void
76
    {
77
        $this->connection->on('data', function ($chunk) {
78
            $parsed = $this->parser->parseRawResponse($chunk);
79
            $this->resolveRequests($parsed);
80
        });
81
82
        $this->connection->on('failed', function () {
83
            $this->pool->rejectAll(new ConnectionClosedException());
84
        });
85
86
        $this->connection->on('close', function () {
87
            if (!$this->isEnding) {
88
                $this->emit('error', [new ConnectionClosedException()]);
89
            }
90
        });
91
    }
92
93
    /**
94
     * @param string $name
95
     * @param array $args
96
     * @return Promise|PromiseInterface
97
     */
98
    public function __call(string $name, array $args)
99
    {
100
        $request = new Request($name);
101
102
        if ($this->isEnding) {
103
            $request->reject(new ConnectionClosedException());
104
        } else {
105
            try {
106
                $command = $this->parser->makeCommand($name, $args);
107
                $this->connection->write($command);
108
                $this->pool->add($request);
109
            } catch (WrongCommandException $e) {
110
                $request->reject($e);
111
            }
112
        }
113
114
        return $request->promise();
115
    }
116
117
    /**
118
     * @param string[] $responses
119
     * @throws Exception
120
     */
121
    public function resolveRequests(array $responses): void
122
    {
123
        if ($this->pool->isEmpty()) {
124
            throw new Exception('Received unexpected response, no matching request found');
125
        }
126
127
        foreach ($responses as $response) {
128
            $request = $this->pool->shift();
129
130
            try {
131
                $parsedResponse = $this->parser->parseResponse($request->command(), $response);
132
                $request->resolve($parsedResponse);
133
            } catch (WrongCommandException $exception) {
134
                $request->reject($exception);
135
            }
136
        }
137
138
        if ($this->isEnding && $this->pool->isEmpty()) {
139
            $this->close();
140
        }
141
    }
142
143
    /**
144
     * Closes the connection when all requests are resolved
145
     */
146
    public function end(): void
147
    {
148
        $this->isEnding = true;
149
150
        if ($this->pool->isEmpty()) {
151
            $this->close();
152
        }
153
    }
154
155
    /**
156
     * Forces closing the connection and rejects all pending requests
157
     */
158
    public function close(): void
159
    {
160
        if ($this->isClosed) {
161
            return;
162
        }
163
164
        $this->isEnding = true;
165
        $this->isClosed = true;
166
167
        $this->connection->close();
168
        $this->emit('close');
169
170
        $this->pool->rejectAll(new ConnectionClosedException());
171
    }
172
}
173