Stream::isEmptyChunk()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 2
eloc 2
nc 2
nop 1
crap 2
1
<?php
2
/**
3
 * Created by PhpStorm.
4
 * User: tamaskovacs
5
 * Date: 2017. 01. 14.
6
 * Time: 16:32
7
 */
8
9
namespace Callisto;
10
11
12
use Psr\Log\LoggerInterface;
13
use Psr\Log\NullLogger;
14
15
abstract class Stream extends Psr7Stream
16
{
17
	/**
18
	 * Streaming API base URL.
19
	 */
20
	const BASE_URL = 'https://stream.twitter.com';
21
22
23
	/**
24
	 * @var Oauth
25
	 */
26
	protected $oauth;
27
28
	/**
29
	 * Streaming endpoint.
30
	 *
31
	 * @var string
32
	 */
33
	protected $endpoint = '';
34
35
	/**
36
	 * Http method to use when connecting to the streaming API.
37
	 *
38
	 * @var string
39
	 */
40
	protected $httpMethod = 'GET';
41
42
	/**
43
	 * @var LoggerInterface
44
	 */
45
	protected $logger;
46
47
	/**
48
	 * Parameters to use to filter the statuses.
49
	 *
50
	 * @var \Callisto\RequestParameter[]
51
	 */
52
	protected $requestParameters = [];
53
54
55
	/**
56
	 * Stream constructor.
57
	 *
58
	 * @param Oauth $oauth
59
	 */
60 18
	public function __construct(Oauth $oauth)
61
	{
62 18
		$this->oauth = $oauth;
63 18
		$this->logger = new NullLogger();
64 18
	}
65
66
	/**
67
	 * Opens a connection to the Twitter Streaming API.
68
	 */
69 9
	public function connect() : void
70
	{
71 9
		if (is_resource($this->connection)) {
72 4
			$this->logger->info('Connection already opened.');
73 4
			return;
74
		}
75
76 9
		$this->logger->info('Opening new connection.');
77
78 9
		$request = $this->oauth->getOauthRequest(
79 9
			$this->getParams(),
80 9
			$this->httpMethod,
81 9
			self::BASE_URL,
82 9
			$this->endpoint
83
		);
84
85 9
		$this->connection = fsockopen('ssl://stream.twitter.com', 443);
86 9
		stream_set_blocking($this->connection, true);
87 9
		$this->write($request);
88
89 9
		$response = [];
90 9
		while (!$this->eof()) {
91 9
			$line = trim((string)$this->readLine());
92 9
			if (empty($line)) {
93 9
				break;
94
			}
95
96 9
			$response[] = $line;
97
		}
98
99 9
		$this->checkResponseStatusCode($response[0]);
100
101 8
		$this->logger->info('Connection successful.', $response);
102 8
	}
103
104
	/**
105
	 * Checks the HTTP response code recieved from the server.
106
	 *
107
	 * @param string $response
108
	 * @throws Exception\ConnectionException If the response code different than 200.
109
	 */
110 9
	protected function checkResponseStatusCode($response)
111
	{
112 9
		preg_match('/^HTTP\/1\.1 ([0-9]{3}).*$/', $response, $matches);
113 9
		if (200 !== (int)$matches[1]) {
114 1
			$this->logger->critical('Connection error', [$response]);
115 1
			throw new Exception\ConnectionException('Connection error: ' . $response);
116
		}
117 8
	}
118
119
	/**
120
	 * Returns the parameters to use in the request.
121
	 *
122
	 * @return array
123
	 */
124 9
	protected function getParams() : array
125
	{
126
		$return = [
127
			'stall_warnings' => 'true'
128 9
		];
129
130 9
		foreach ($this->requestParameters as $filter) {
131 1
			$return[$filter->getKey()] = $filter->getValue();
132
		}
133
134 9
		return $return;
135
	}
136
137
	/**
138
	 * Sets the filters to use in the request.
139
	 *
140
	 * @param RequestParameter[] $requestParameters
141
	 * @return $this Fluent interface.
142
	 */
143 1
	public function setRequestParameters($requestParameters)
144
	{
145 1
		$this->requestParameters = $requestParameters;
146 1
		return $this;
147
	}
148
149
	/**
150
	 * Handles the message from twitter.
151
	 *
152
	 * @param string $messageJson
153
	 */
154 1
	protected function handleMessage(string $messageJson) : void
155
	{
156 1
		$message = json_decode($messageJson);
157 1
		$this->logger->info('Message received', [$message]);
158 1
	}
159
160
	/**
161
	 * Determines if the received json is a message from twitter.
162
	 *
163
	 * @param string $status
164
	 * @return bool
165
	 */
166 3
	private function isMessage(string $status) : bool
167
	{
168 3
		$testStr = substr($status, 0, 14);
169 3
		if ('{"created_at":' == $testStr) {
170 3
			return false;
171
		}
172
173 1
		return true;
174
	}
175
176
	/**
177
	 * Override the default NullLogger
178
	 *
179
	 * @param LoggerInterface $logger
180
	 * @return Stream $this Fluent interface.
181
	 */
182 18
	public function setLogger(LoggerInterface $logger) : Stream
183
	{
184 18
		$this->logger = $logger;
185 18
		return $this;
186
	}
187
188
	/**
189
	 * Reads the next chunk of $chunkSize from the stream.
190
	 *
191
	 * @param int $chunkSize
192
	 * @return string
193
	 */
194 3
	protected function readChunk(int $chunkSize) : string
195
	{
196 3
		return $this->read($chunkSize);
197
	}
198
199
	/**
200
	 * Reads the size of the next chunk from the stream.
201
	 *
202
	 * @return int
203
	 * @throws Exception\ConnectionClosedException
204
	 */
205 3
	protected function readNextChunkSize() : int
206
	{
207 3
		while (!$this->eof()) {
208 3
			$line = trim($this->readLine());
209
210 3
			if (!empty($line)) {
211 3
				$chunkSize = hexdec($line);
212 3
				return (int)$chunkSize;
213
			}
214
		}
215
216 1
		$this->logger->error('Connection closed.');
217 1
		throw new Exception\ConnectionClosedException('Connection closed.');
218
	}
219
220
	/**
221
	 * Connects to the Twitter API and starts reading the stream.
222
	 *
223
	 * When it recieves a new status it will be passed on to the @link self::enqueueStatus() method.
224
	 *
225
	 * @return \Generator
226
	 */
227 3
	public function readStream() : \Generator
228
	{
229 3
		$this->connect();
230
231 3
		$status = '';
232 3
		while (!$this->eof()) {
233 3
			$chunkSize = $this->readNextChunkSize();
234
235 3
			if ($this->isEmptyChunk($chunkSize)) {
236 2
				continue;
237
			}
238
239 3
			$chunk = $this->readChunk($chunkSize);
240 3
			$status .= $chunk;
241
242 3
			if ($this->isJsonFinished($chunk, $chunkSize)) {
243 3
				if ($this->isMessage($status)) {
244 1
					$this->handleMessage($status);
245
				} else {
246 3
					yield $status;
247
				}
248
249 3
				$status = '';
250
			}
251
		}
252
253 2
		$this->close();
254 2
	}
255
256
	/**
257
	 * Decides weather the read chunk is the end of a complete json object.
258
	 *
259
	 * @param string $chunk
260
	 * @param int $chunkSize
261
	 * @return bool
262
	 */
263 3
	protected function isJsonFinished($chunk, $chunkSize)
264
	{
265 3
		return "\r\n" == substr($chunk, $chunkSize - 2, 2) || $this->eof();
266
	}
267
268
	/**
269
	 * Decides weather the read chunk is empty or not.
270
	 *
271
	 * @param int $chunkSize
272
	 * @return bool
273
	 */
274 3
	protected function isEmptyChunk($chunkSize)
275
	{
276 3
		return 2 == $chunkSize || 0 == $chunkSize;
277
	}
278
}
279