Completed
Push — master ( c0eda5...1ea1f4 )
by Sergey
03:04
created

Connection::onFailed()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nc 1
nop 0
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace seregazhuk\React\Memcached\Connection;
4
5
use Evenement\EventEmitter;
6
use React\Promise\PromiseInterface;
7
use React\Socket\ConnectionInterface;
8
use React\Socket\ConnectorInterface;
9
use React\Stream\DuplexStreamInterface;
10
11
class Connection extends EventEmitter
12
{
13
    /**
14
     * @var DuplexStreamInterface
15
     */
16
    protected $stream;
17
18
    /**
19
     * @var string
20
     */
21
    protected $address;
22
23
    /**
24
     * @var ConnectorInterface
25
     */
26
    protected $connector;
27
28
    /**
29
     * @var bool
30
     */
31
    protected $isConnecting = false;
32
33
    /**
34
     * @var CommandsPool
35
     */
36
    protected $commandsPool;
37
38
    /**
39
     * @param string $address
40
     * @param ConnectorInterface $connector
41
     */
42
    public function __construct($address, ConnectorInterface $connector)
43
    {
44
        $this->address = $address;
45
        $this->connector = $connector;
46
        $this->commandsPool = new CommandsPool();
47
    }
48
49
    /**
50
     * @return PromiseInterface
51
     */
52
    public function connect()
53
    {
54
        $this->isConnecting = true;
55
56
        return $this->connector
57
            ->connect($this->address)
58
            ->then(
59
                [$this, 'onConnected'],
60
                [$this, 'onFailed']
61
            );
62
    }
63
64
    /**
65
     * @param ConnectionInterface $stream
66
     */
67
    public function onConnected(ConnectionInterface $stream)
68
    {
69
        $this->stream = $stream;
70
        $this->isConnecting = false;
71
72
        $stream->on('data', function ($chunk) {
73
            $this->emit('data', [$chunk]);
74
        });
75
76
        $stream->on('close', [$this, 'close']);
77
78
        while ($command = $this->commandsPool->shift()) {
79
            $this->stream->write($command);
80
        }
81
    }
82
83
    public function onFailed()
84
    {
85
        $this->cancelConnecting();
86
        $this->emit('failed');
87
    }
88
89
    public function close()
90
    {
91
        if ($this->stream) {
92
            $this->stream->close();
93
        }
94
95
        $this->cancelConnecting();
96
        $this->emit('close');
97
    }
98
99
    /**
100
     * @param string $command
101
     */
102
    public function write($command)
103
    {
104
        if ($this->stream && $this->stream->isWritable()) {
105
            $this->stream->write($command);
106
            return;
107
        }
108
109
        $this->commandsPool->add($command);
110
        if (!$this->isConnecting) {
111
            $this->connect();
112
        }
113
    }
114
115
    private function cancelConnecting()
116
    {
117
        $this->isConnecting = false;
118
        $this->commandsPool->clear();
119
    }
120
}
121