Passed
Push — master ( a4c2d3...394828 )
by Tamás
02:21
created

Stream::isEmptyChunk()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 2
eloc 2
nc 2
nop 1
crap 2
1
<?php
2
/**
3
 * Created by PhpStorm.
4
 * User: tamaskovacs
5
 * Date: 2017. 01. 14.
6
 * Time: 16:32
7
 */
8
9
namespace Callisto;
10
11
12
use Psr\Log\LoggerInterface;
13
use Psr\Log\NullLogger;
14
15
class Stream extends Psr7Stream
16
{
17
	/**
18
	 * Streaming API base URL.
19
	 */
20
	const BASE_URL = 'https://stream.twitter.com';
21
22
23
	/**
24
	 * @var Oauth
25
	 */
26
	protected $oauth;
27
28
	/**
29
	 * Streaming endpoint.
30
	 *
31
	 * @var string
32
	 */
33
	protected $endpoint = '';
34
35
	/**
36
	 * Http method to use when connecting to the streaming API.
37
	 *
38
	 * @var string
39
	 */
40
	protected $httpMethod = 'GET';
41
42
	/**
43
	 * @var LoggerInterface
44
	 */
45
	protected $logger;
46
47
	/**
48
	 * Parameters to use to filter the statuses.
49
	 *
50
	 * @var \Callisto\RequestParameter[]
51
	 */
52
	protected $requestParameters = [];
53
54
55
	/**
56
	 * Stream constructor.
57
	 *
58
	 * @param Oauth $oauth
59
	 */
60 3
	public function __construct(Oauth $oauth)
61
	{
62 3
		$this->oauth = $oauth;
63 3
		$this->logger = new NullLogger();
64 3
	}
65
66
	/**
67
	 * Opens a connection to the Twitter Streaming API.
68
	 */
69 3
	public function connect() : void
70
	{
71 3
		if (is_resource($this->connection)) {
72 2
			$this->logger->info('Connection already opened.');
73 2
			return;
74
		}
75
76 3
		$this->logger->info('Opening new connection.');
77
78 3
		$request = $this->oauth->getOauthRequest(
79 3
			$this->getParams(),
80 3
			$this->httpMethod,
81 3
			self::BASE_URL,
82 3
			$this->endpoint
83
		);
84
85 3
		$this->connection = fsockopen('ssl://stream.twitter.com', 443);
86 3
		stream_set_blocking($this->connection, true);
87 3
		$this->write($request);
88
89 3
		$response = [];
90 3
		while (!$this->eof()) {
91 3
			$line = trim((string)$this->readLine());
92 3
			if (empty($line)) {
93 3
				break;
94
			}
95
96 3
			$response[] = $line;
97
		}
98
99 3
		$this->checkResponseStatusCode($response[0]);
100
101 2
		$this->logger->info('Connection successful.', $response);
102 2
	}
103
104
	/**
105
	 * Checks the HTTP response code recieved from the server.
106
	 *
107
	 * @param string $response
108
	 * @throws \Exception If the response code different than 200.
109
	 */
110 3
	protected function checkResponseStatusCode($response)
111
	{
112 3
		preg_match('/^HTTP\/1\.1 ([0-9]{3}).*$/', $response, $matches);
113 3
		if (200 !== (int)$matches[1]) {
114 1
			$this->logger->critical('Connection error', [$response]);
115 1
			throw new \Exception('Connection error: ' . $response);
116
		}
117 2
	}
118
119
	/**
120
	 * Returns the parameters to use in the request.
121
	 *
122
	 * @return array
123
	 */
124 3
	protected function getParams() : array
125
	{
126
		$return = [
127
			'stall_warnings' => 'true'
128 3
		];
129
130 3
		foreach ($this->requestParameters as $filter) {
131
			$return[$filter->getKey()] = $filter->getValue();
132
		}
133
134 3
		return $return;
135
	}
136
137
	/**
138
	 * Sets the filters to use in the request.
139
	 *
140
	 * @param RequestParameter[] $requestParameters
141
	 * @return $this Fluent interface.
142
	 */
143
	public function setRequestParameters($requestParameters)
144
	{
145
		$this->requestParameters = $requestParameters;
146
		return $this;
147
	}
148
149
	/**
150
	 * Handles the message from twitter.
151
	 *
152
	 * @param string $messageJson
153
	 */
154
	private function handleMessage(string $messageJson) : void
155
	{
156
		$message = json_decode($messageJson);
157
		$this->logger->info('Message received', [$message]);
158
	}
159
160
	/**
161
	 * Determines if the received json is message from twitter.
162
	 *
163
	 * @param string $status
164
	 * @return bool
165
	 */
166 2
	private function isMessage(string $status) : bool
167
	{
168 2
		$testStr = substr($status, 0, 14);
169 2
		if ('{"created_at":' == $testStr) {
0 ignored issues
show
Unused Code introduced by
This if statement, and the following return statement can be replaced with return !('{"created_at":' == $testStr);.
Loading history...
170 2
			return false;
171
		}
172
173
		return true;
174
	}
175
176
	/**
177
	 * Override the default NullLogger
178
	 *
179
	 * @param LoggerInterface $logger
180
	 * @return Stream $this Fluent interface.
181
	 */
182
	public function setLogger(LoggerInterface $logger) : Stream
183
	{
184
		$this->logger = $logger;
185
		return $this;
186
	}
187
188
	/**
189
	 * Reads the next chunk of $chunkSize from the stream.
190
	 *
191
	 * @param int $chunkSize
192
	 * @return string
193
	 */
194 2
	protected function readChunk(int $chunkSize) : string
195
	{
196 2
		return $this->read($chunkSize);
197
	}
198
199
	/**
200
	 * Reads the size of the next chunk from the stream.
201
	 *
202
	 * @return int
203
	 * @throws \Exception
204
	 */
205 2
	protected function readNextChunkSize() : int
206
	{
207 2
		while (!$this->eof()) {
208 2
			$line = trim($this->readLine());
209
210 2
			if (!empty($line)) {
211 2
				$chunkSize = hexdec($line);
212 2
				return (int)$chunkSize;
213
			}
214
		}
215
216
		$this->logger->error('Connection closed.');
217
		throw new \Exception('Connection closed.');
218
	}
219
220
	/**
221
	 * Connects to the Twitter API and starts reading the stream.
222
	 *
223
	 * When it recieves a new status it will be passed on to the @link self::enqueueStatus() method.
224
	 *
225
	 * @return \Generator
226
	 */
227 2
	public function readStream() : \Generator
228
	{
229 2
		$this->connect();
230
231 2
		$status = '';
232 2
		while (!$this->eof()) {
233 2
			$chunkSize = $this->readNextChunkSize();
234
235 2
			if ($this->isEmptyChunk($chunkSize)) {
236
				continue;
237
			}
238
239 2
			$chunk = $this->readChunk($chunkSize);
240 2
			$status .= $chunk;
241
242 2
			if ($this->shouldChunkBeHandled($chunk, $chunkSize)) {
243 2
				if ($this->isMessage($status)) {
244
					$this->handleMessage($status);
245
				} else {
246 2
					yield $status;
247
				}
248
249 2
				$status = '';
250
			}
251
		}
252 2
	}
253
254
	/**
255
	 * Decides weather the read chunk is the end of a complete json object.
256
	 *
257
	 * @param string $chunk
258
	 * @param int $chunkSize
259
	 * @return bool
260
	 */
261 2
	protected function shouldChunkBeHandled($chunk, $chunkSize)
262
	{
263 2
		return "\r\n" == substr($chunk, $chunkSize - 2, 2) || $this->eof();
264
	}
265
266
	/**
267
	 * Decides weather the read chunk is empty or not.
268
	 *
269
	 * @param int $chunkSize
270
	 * @return bool
271
	 */
272 2
	protected function isEmptyChunk($chunkSize)
273
	{
274 2
		return 2 == $chunkSize || 0 == $chunkSize;
275
	}
276
}
277