1
|
|
|
<?php declare(strict_types=1); |
2
|
|
|
|
3
|
|
|
namespace ApiClients\Client\Twitter; |
4
|
|
|
|
5
|
|
|
use ApiClients\Foundation\Client; |
6
|
|
|
use ApiClients\Foundation\Hydrator\CommandBus\Command\HydrateCommand; |
7
|
|
|
use ApiClients\Foundation\Transport\CommandBus\Command\StreamingRequestCommand; |
8
|
|
|
use GuzzleHttp\Psr7\Request; |
9
|
|
|
use Psr\Http\Message\RequestInterface; |
10
|
|
|
use React\EventLoop\LoopInterface; |
11
|
|
|
use Rx\Observable; |
12
|
|
|
use Rx\Operator\CutOperator; |
13
|
|
|
use Rx\React\Promise; |
14
|
|
|
use Rx\Scheduler\ImmediateScheduler; |
15
|
|
|
|
16
|
|
|
final class AsyncStreamingClient implements AsyncStreamingClientInterface |
17
|
|
|
{ |
18
|
|
|
const STREAM_DELIMITER = "\r\n"; |
19
|
|
|
|
20
|
|
|
/** |
21
|
|
|
* @var Client |
22
|
|
|
*/ |
23
|
|
|
protected $client; |
24
|
|
|
|
25
|
|
|
/** |
26
|
|
|
* @var LoopInterface |
27
|
|
|
*/ |
28
|
|
|
private $loop; |
29
|
|
|
|
30
|
|
|
public function __construct(Client $client) |
31
|
|
|
{ |
32
|
|
|
$this->client = $client; |
33
|
|
|
} |
34
|
|
|
|
35
|
|
|
public function sample(): Observable |
36
|
|
|
{ |
37
|
|
|
return $this->stream( |
38
|
|
|
new Request('GET', 'https://stream.twitter.com/1.1/statuses/sample.json') |
39
|
|
|
); |
40
|
|
|
} |
41
|
|
|
|
42
|
|
|
public function filtered(array $filter = []): Observable |
43
|
|
|
{ |
44
|
|
|
$postData = http_build_query($filter); |
45
|
|
|
|
46
|
|
|
return $this->stream( |
47
|
|
|
new Request( |
48
|
|
|
'POST', |
49
|
|
|
'https://stream.twitter.com/1.1/statuses/filter.json', |
50
|
|
|
[ |
51
|
|
|
'Content-Type' => 'application/x-www-form-urlencoded', |
52
|
|
|
'Content-Length' => strlen($postData), |
53
|
|
|
], |
54
|
|
|
$postData |
55
|
|
|
) |
56
|
|
|
); |
57
|
|
|
} |
58
|
|
|
|
59
|
|
|
protected function stream(RequestInterface $request): Observable |
60
|
|
|
{ |
61
|
|
|
return Promise::toObservable($this->client->handle(new StreamingRequestCommand( |
|
|
|
|
62
|
|
|
$request |
63
|
|
|
)))->switchLatest()->lift(function () { |
64
|
|
|
return new CutOperator(self::STREAM_DELIMITER, new ImmediateScheduler()); |
65
|
|
|
})->filter(function (string $json) { |
66
|
|
|
return trim($json) !== ''; // To keep the stream alive Twitter sends an empty line at times |
67
|
|
|
})->_ApiClients_jsonDecode()->flatMap(function (array $document) { |
68
|
|
|
if (isset($document['delete'])) { |
69
|
|
|
return Promise::toObservable($this->client->handle( |
70
|
|
|
new HydrateCommand('DeletedTweet', $document['delete']) |
71
|
|
|
)); |
72
|
|
|
} |
73
|
|
|
|
74
|
|
|
return Promise::toObservable($this->client->handle(new HydrateCommand('Tweet', $document))); |
75
|
|
|
}); |
76
|
|
|
} |
77
|
|
|
} |
78
|
|
|
|
This method has been deprecated. The supplier of the class has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.