Connection::sendRequest()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 5
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 4
nc 1
nop 1
1
<?php
2
namespace evseevnn\Cassandra;
3
use evseevnn\Cassandra\Cluster\Node;
4
use evseevnn\Cassandra\Enum;
5
use evseevnn\Cassandra\Exception\ConnectionException;
6
use evseevnn\Cassandra\Protocol\Frame;
7
use evseevnn\Cassandra\Protocol\Request;
8
use evseevnn\Cassandra\Protocol\Response;
9
10
class Connection {
11
12
	/**
13
	 * @var Cluster
14
	 */
15
	private $cluster;
16
17
	/**
18
	 * @var Node
19
	 */
20
	private $node;
21
22
	/**
23
	 * @var resource
24
	 */
25
	private $connection;
26
27
	/**
28
	 * @param Cluster $cluster
29
	 */
30
	public function __construct(Cluster $cluster) {
31
		$this->cluster = $cluster;
32
	}
33
34
	public function connect() {
35
		try {
36
			$this->node = $this->cluster->getRandomNode();
37
			$this->connection = $this->node->getConnection();
38
		} catch (ConnectionException $e) {
39
			$this->connect();
40
		}
41
	}
42
43
	/**
44
	 * @return bool
45
	 */
46
	public function disconnect() {
47
		return socket_shutdown($this->connection);
48
	}
49
50
	/**
51
	 * @return bool
52
	 */
53
	public function isConnected() {
54
		return $this->connection !== null;
55
	}
56
57
	/**
58
	 * @param Request $request
59
	 * @return \evseevnn\Cassandra\Protocol\Response
60
	 */
61
	public function sendRequest(Request $request) {
62
		$frame = new Frame(Enum\VersionEnum::REQUEST, $request->getType(), $request);
63
		socket_write($this->connection, $frame);
64
		return $this->getResponse();
65
	}
66
67
	/**
68
	 * @param $length
69
	 * @throws Exception\ConnectionException
70
	 * @return string
71
	 */
72
	private function fetchData($length) {
73
		$data = socket_read($this->connection, $length);
74
		while (strlen($data) < $length) {
75
			$data .= socket_read($this->connection, $length);
76
		}
77
		if (socket_last_error($this->connection) == 110) {
78
			throw new ConnectionException('Connection timed out');
79
		}
80
81
		return $data;
82
	}
83
84
	private function getResponse() {
85
		$data = $this->fetchData(8);
86
		$data = unpack('Cversion/Cflags/cstream/Copcode/Nlength', $data);
87
		if ($data['length']) {
88
			$body = $this->fetchData($data['length']);
89
		} else {
90
			$body = '';
91
		}
92
93
		return new Response($data['opcode'], $body);
94
	}
95
96
	/**
97
	 * @return \evseevnn\Cassandra\Cluster\Node
98
	 */
99
	public function getNode() {
100
		return $this->node;
101
	}
102
}
103