Stream::close()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 1
Metric Value
c 2
b 0
f 1
dl 0
loc 3
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
namespace Cassandra\Connection;
3
4
class Stream {
5
6
	/**
7
	 * @var resource
8
	 */
9
	protected $_stream;
10
11
	/**
12
	 * @var array
13
	 */
14
	protected $_options = [
15
		'host'		=> null,
16
		'port'		=> 9042,
17
		'username'	=> null,
18
		'password'	=> null,
19
		'timeout'	=> 30,
20
		'connectTimeout'=>5,
21
		'persistent'=> false,
22
	];
23
24
	/**
25
	 * @param array $options
26
	 */
27
	public function __construct(array $options) {
28
		$this->_options = array_merge($this->_options, $options);
29
30
		$this->_connect();
31
	}
32
33
	/**
34
	 *
35
	 * @throws StreamException
36
	 * @return resource
37
	 */
38
	protected function _connect() {
39
		if (!empty($this->_stream)) return $this->_stream;
40
41
		$this->_stream = $this->_options['persistent']
42
			? pfsockopen($this->_options['host'], $this->_options['port'], $errorCode, $errorMessage, $this->_options['connectTimeout'])
43
			: fsockopen($this->_options['host'], $this->_options['port'], $errorCode, $errorMessage, $this->_options['connectTimeout']);
44
45
		if ($this->_stream === false){
46
			throw new StreamException($errorMessage, $errorCode);
47
		}
48
49
		stream_set_timeout($this->_stream,$this->_options['timeout']);
50
	}
51
52
	/**
53
	 * @return array
54
	 */
55
	public function getOptions() {
56
		return $this->_options;
57
	}
58
59
	/**
60
	 * @param $length
61
	 * @throws StreamException
62
	 * @return string
63
	 */
64
	public function read($length) {
65
		$data = '';
66
		do{
67
			$readData = fread($this->_stream, $length);
68
69
			if (feof($this->_stream))
70
				throw new StreamException('Connection reset by peer');
71
72
			if (stream_get_meta_data($this->_stream)['timed_out'])
73
				throw new StreamException('Connection timed out');
74
75
			if (strlen($readData) == 0)
76
				throw new StreamException("Unknown error");
77
78
			$data .= $readData;
79
			$length -= strlen($readData);
80
		}
81
		while($length > 0);
82
		
83
		return $data;
84
	}
85
86
	/**
87
	 * @param $length
88
	 * @throws StreamException
89
	 * @return string
90
	 */
91
	public function readOnce($length){
92
		$readData = fread($this->_stream, $length);
93
94
		if (feof($this->_stream))
95
			throw new StreamException('Connection reset by peer');
96
97
		if (stream_get_meta_data($this->_stream)['timed_out'])
98
			throw new StreamException('Connection timed out');
99
100
		if (strlen($readData) == 0)
101
			throw new StreamException("Unknown error");
102
103
		return $readData;
104
	}
105
106
	/**
107
	 *
108
	 * @param string $binary
109
	 * @throws StreamException
110
	 */
111
	public function write($binary){
112
		do{
113
			$sentBytes = fwrite($this->_stream, $binary);
114
			
115
			if (feof($this->_stream))
116
				throw new StreamException('Connection reset by peer');
117
			
118
			if (stream_get_meta_data($this->_stream)['timed_out'])
119
				throw new StreamException('Connection timed out');
120
			
121
			if ($sentBytes == 0)
122
				throw new StreamException("Uknown error");
123
			
124
			$binary = substr($binary, $sentBytes);
125
		}
126
		while(!empty($binary));
127
	}
128
129
	public function close(){
130
		 fclose($this->_stream);
131
	}
132
}
133