Passed
Branch master (17f28c)
by Tamás
02:14
created

Stream   A

Complexity

Total Complexity 27

Size/Duplication

Total Lines 263
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 5

Test Coverage

Coverage 81.01%

Importance

Changes 0
Metric Value
wmc 27
lcom 1
cbo 5
dl 0
loc 263
ccs 64
cts 79
cp 0.8101
rs 10
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 1
B connect() 0 34 4
A checkResponseStatusCode() 0 8 2
A getParams() 0 12 2
A setRequestParameters() 0 5 1
A handleMessage() 0 5 1
A isMessage() 0 10 2
A setLogger() 0 5 1
A readChunk() 0 4 1
A readNextChunkSize() 0 14 3
B readStream() 0 26 5
A shouldChunkBeHandled() 0 4 2
A isEmptyChunk() 0 4 2
1
<?php
2
/**
3
 * Created by PhpStorm.
4
 * User: tamaskovacs
5
 * Date: 2017. 01. 14.
6
 * Time: 16:32
7
 */
8
9
namespace Callisto;
10
11
12
use Psr\Log\LoggerInterface;
13
use Psr\Log\NullLogger;
14
15
class Stream extends Psr7Stream
16
{
17
	/**
18
	 * Streaming API base URL.
19
	 */
20
	const BASE_URL = 'https://stream.twitter.com';
21
22
23
	/**
24
	 * @var Oauth
25
	 */
26
	protected $oauth;
27
28
	/**
29
	 * Streaming endpoint.
30
	 *
31
	 * @var string
32
	 */
33
	protected $endpoint = '';
34
35
	/**
36
	 * Http method to use when connecting to the streaming API.
37
	 *
38
	 * @var string
39
	 */
40
	protected $httpMethod = 'GET';
41
42
	/**
43
	 * @var LoggerInterface
44
	 */
45
	protected $logger;
46
47
	/**
48
	 * Parameters to use to filter the statuses.
49
	 *
50
	 * @var \Callisto\RequestParameter[]
51
	 */
52
	protected $requestParameters = [];
53
54
55
	/**
56
	 * Stream constructor.
57
	 *
58
	 * @param Oauth $oauth
59
	 */
60 3
	public function __construct(Oauth $oauth)
61
	{
62 3
		$this->oauth = $oauth;
63 3
		$this->logger = new NullLogger();
64 3
	}
65
66
	/**
67
	 * Opens a connection to the Twitter Streaming API.
68
	 */
69 3
	public function connect() : void
70
	{
71 3
		if (is_resource($this->connection)) {
72 2
			$this->logger->info('Connection already opened.');
73 2
			return;
74
		}
75
76 3
		$this->logger->info('Opening new connection.');
77
78 3
		$request = $this->oauth->getOauthRequest(
79 3
			$this->getParams(),
80 3
			$this->httpMethod,
81 3
			self::BASE_URL,
82 3
			$this->endpoint
83
		);
84
85 3
		$this->connection = fsockopen('ssl://stream.twitter.com', 443);
86 3
		stream_set_blocking($this->connection, true);
87 3
		$this->write($request);
88
89 3
		$response = [];
90 3
		while (!$this->eof()) {
91 3
			$line = trim((string)$this->readLine());
92 3
			if (empty($line)) {
93 3
				break;
94
			}
95
96 3
			$response[] = $line;
97
		}
98
99 3
		$this->checkResponseStatusCode($response[0]);
100
101 2
		$this->logger->info('Connection successful.', $response);
102 2
	}
103
104
	/**
105
	 * Checks the HTTP response code recieved from the server.
106
	 *
107
	 * @param string $response
108
	 * @throws \Exception If the response code different than 200.
109
	 */
110 3
	protected function checkResponseStatusCode($response)
111
	{
112 3
		preg_match('/^HTTP\/1\.1 ([0-9]{3}).*$/', $response, $matches);
113 3
		if (200 !== (int)$matches[1]) {
114 1
			$this->logger->critical('Connection error', [$response]);
115 1
			throw new \Exception('Connection error: ' . $response);
116
		}
117 2
	}
118
119
	/**
120
	 * Returns the parameters to use in the request.
121
	 *
122
	 * @return array
123
	 */
124 3
	protected function getParams() : array
125
	{
126
		$return = [
127
			'stall_warnings' => 'true'
128 3
		];
129
130 3
		foreach ($this->requestParameters as $filter) {
131
			$return[$filter->getKey()] = $filter->getValue();
132
		}
133
134 3
		return $return;
135
	}
136
137
	/**
138
	 * Sets the filters to use in the request.
139
	 *
140
	 * @param RequestParameter[] $requestParameters
141
	 * @return $this Fluent interface.
142
	 */
143
	public function setRequestParameters($requestParameters)
144
	{
145
		$this->requestParameters = $requestParameters;
146
		return $this;
147
	}
148
149
	/**
150
	 * Handles the message from twitter.
151
	 *
152
	 * @param string $messageJson
153
	 */
154
	private function handleMessage(string $messageJson) : void
155
	{
156
		$message = json_decode($messageJson);
157
		$this->logger->info('Message received', [$message]);
158
	}
159
160
	/**
161
	 * Determines if the received json is a message from twitter.
162
	 *
163
	 * @param string $status
164
	 * @return bool
165
	 */
166 2
	private function isMessage(string $status) : bool
167
	{
168 2
		$testStr = substr($status, 0, 14);
169 2
		return !('{"created_at":' == $testStr);
170
		if ('{"created_at":' == $testStr) {
0 ignored issues
show
Unused Code introduced by
This if statement, and the following return statement can be replaced with return !('{"created_at":' == $testStr);.
Loading history...
Unused Code introduced by
if ('{"created_at":' == ...) { return false; } does not seem to be reachable.

This check looks for unreachable code. It uses sophisticated control flow analysis techniques to find statements which will never be executed.

Unreachable code is most often the result of return, die or exit statements that have been added for debug purposes.

function fx() {
    try {
        doSomething();
        return true;
    }
    catch (\Exception $e) {
        return false;
    }

    return false;
}

In the above example, the last return false will never be executed, because a return statement has already been met in every possible execution path.

Loading history...
171
			return false;
172
		}
173
174
		return true;
175
	}
176
177
	/**
178
	 * Override the default NullLogger
179
	 *
180
	 * @param LoggerInterface $logger
181
	 * @return Stream $this Fluent interface.
182
	 */
183
	public function setLogger(LoggerInterface $logger) : Stream
184
	{
185
		$this->logger = $logger;
186
		return $this;
187
	}
188
189
	/**
190
	 * Reads the next chunk of $chunkSize from the stream.
191
	 *
192
	 * @param int $chunkSize
193
	 * @return string
194
	 */
195 2
	protected function readChunk(int $chunkSize) : string
196
	{
197 2
		return $this->read($chunkSize);
198
	}
199
200
	/**
201
	 * Reads the size of the next chunk from the stream.
202
	 *
203
	 * @return int
204
	 * @throws \Exception
205
	 */
206 2
	protected function readNextChunkSize() : int
207
	{
208 2
		while (!$this->eof()) {
209 2
			$line = trim($this->readLine());
210
211 2
			if (!empty($line)) {
212 2
				$chunkSize = hexdec($line);
213 2
				return (int)$chunkSize;
214
			}
215
		}
216
217
		$this->logger->error('Connection closed.');
218
		throw new \Exception('Connection closed.');
219
	}
220
221
	/**
222
	 * Connects to the Twitter API and starts reading the stream.
223
	 *
224
	 * When it recieves a new status it will be passed on to the @link self::enqueueStatus() method.
225
	 *
226
	 * @return \Generator
227
	 */
228 2
	public function readStream() : \Generator
229
	{
230 2
		$this->connect();
231
232 2
		$status = '';
233 2
		while (!$this->eof()) {
234 2
			$chunkSize = $this->readNextChunkSize();
235
236 2
			if ($this->isEmptyChunk($chunkSize)) {
237
				continue;
238
			}
239
240 2
			$chunk = $this->readChunk($chunkSize);
241 2
			$status .= $chunk;
242
243 2
			if ($this->shouldChunkBeHandled($chunk, $chunkSize)) {
244 2
				if ($this->isMessage($status)) {
245
					$this->handleMessage($status);
246
				} else {
247 2
					yield $status;
248
				}
249
250 2
				$status = '';
251
			}
252
		}
253 2
	}
254
255
	/**
256
	 * Decides weather the read chunk is the end of a complete json object.
257
	 *
258
	 * @param string $chunk
259
	 * @param int $chunkSize
260
	 * @return bool
261
	 */
262 2
	protected function shouldChunkBeHandled($chunk, $chunkSize)
263
	{
264 2
		return "\r\n" == substr($chunk, $chunkSize - 2, 2) || $this->eof();
265
	}
266
267
	/**
268
	 * Decides weather the read chunk is empty or not.
269
	 *
270
	 * @param int $chunkSize
271
	 * @return bool
272
	 */
273 2
	protected function isEmptyChunk($chunkSize)
274
	{
275 2
		return 2 == $chunkSize || 0 == $chunkSize;
276
	}
277
}
278