Connection   A
last analyzed

Complexity

Total Complexity 12

Size/Duplication

Total Lines 83
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 36
dl 0
loc 83
rs 10
c 0
b 0
f 0
wmc 12

7 Methods

Rating   Name   Duplication   Size   Complexity  
A connect() 0 9 1
A onFailed() 0 4 1
A __construct() 0 5 1
A onConnected() 0 13 2
A write() 0 10 4
A cancelConnecting() 0 4 1
A close() 0 8 2
1
<?php
2
3
namespace seregazhuk\React\Memcached\Connection;
4
5
use Evenement\EventEmitter;
6
use React\Promise\PromiseInterface;
7
use React\Socket\ConnectionInterface;
8
use React\Socket\ConnectorInterface;
9
use React\Stream\DuplexStreamInterface;
10
11
final class Connection extends EventEmitter
12
{
13
    /**
14
     * @var DuplexStreamInterface
15
     */
16
    private $stream;
17
18
    private $address;
19
20
    private $connector;
21
22
    private $isConnecting = false;
23
24
    private $commandsPool;
25
26
    public function __construct(string $address, ConnectorInterface $connector)
27
    {
28
        $this->address = $address;
29
        $this->connector = $connector;
30
        $this->commandsPool = new CommandsPool();
31
    }
32
33
    public function connect(): PromiseInterface
34
    {
35
        $this->isConnecting = true;
36
37
        return $this->connector
38
            ->connect($this->address)
39
            ->then(
40
                [$this, 'onConnected'],
41
                [$this, 'onFailed']
42
            );
43
    }
44
45
    public function onConnected(ConnectionInterface $stream): void
46
    {
47
        $this->stream = $stream;
48
        $this->isConnecting = false;
49
50
        $stream->on('data', function ($chunk) {
51
            $this->emit('data', [$chunk]);
52
        });
53
54
        $stream->on('close', [$this, 'close']);
55
56
        while ($command = $this->commandsPool->shift()) {
57
            $this->stream->write($command);
58
        }
59
    }
60
61
    public function onFailed(): void
62
    {
63
        $this->cancelConnecting();
64
        $this->emit('failed');
65
    }
66
67
    public function close(): void
68
    {
69
        if ($this->stream) {
70
            $this->stream->close();
71
        }
72
73
        $this->cancelConnecting();
74
        $this->emit('close');
75
    }
76
77
    public function write(string $command): void
78
    {
79
        if ($this->stream && $this->stream->isWritable()) {
80
            $this->stream->write($command);
81
            return;
82
        }
83
84
        $this->commandsPool->add($command);
85
        if (!$this->isConnecting) {
86
            $this->connect();
87
        }
88
    }
89
90
    private function cancelConnecting(): void
91
    {
92
        $this->isConnecting = false;
93
        $this->commandsPool->clear();
94
    }
95
}
96