Completed
Push — master ( 5bf22f...cf1632 )
by Kirill
06:39
created

StreamAdapter::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 9
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 9
rs 9.6666
c 0
b 0
f 0
cc 1
eloc 6
nc 1
nop 2
1
<?php declare(strict_types = 1);
2
/**
3
 * This file is part of GitterApi package.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
namespace Gitter\Adapters;
9
10
use Clue\React\Buzz\Browser;
11
use Clue\React\Buzz\Io\Sender;
12
use Clue\React\Buzz\Message\MessageFactory;
13
use Gitter\Client;
14
use Gitter\Route;
15
use Gitter\Support\JsonStream;
16
use Gitter\Support\Observer;
17
use Monolog\Logger;
18
use Psr\Http\Message\ResponseInterface;
19
use React\EventLoop\ExtEventLoop;
20
use React\EventLoop\Factory as EventLoop;
21
use React\EventLoop\LibEventLoop;
22
use React\EventLoop\LibEvLoop;
23
use React\EventLoop\LoopInterface;
24
use React\EventLoop\StreamSelectLoop;
25
use React\Promise\Promise;
26
use React\Stream\ReadableStreamInterface;
27
28
/**
29
 * Class HttpAdapter
30
 * @package Gitter\Adapters
31
 */
32
class StreamAdapter extends AbstractClient implements StreamAdapterInterface
33
{
34
    /**
35
     * @var Browser
36
     */
37
    private $browser;
38
39
    /**
40
     * @var Sender
41
     */
42
    private $sender;
43
44
    /**
45
     * @var MessageFactory
46
     */
47
    private $messages;
48
49
    /**
50
     * @var ExtEventLoop|LibEventLoop|LibEvLoop|StreamSelectLoop
51
     */
52
    private $loop;
53
54
    /**
55
     * @var Client
56
     */
57
    private $client;
58
59
    /**
60
     * HttpAdapter constructor.
61
     * @param Client $client
62
     * @param LoopInterface $loop
63
     */
64
    public function __construct(Client $client, LoopInterface $loop)
65
    {
66
        $this->client = $client;
67
        $this->loop = $loop;
0 ignored issues
show
Documentation Bug introduced by
It seems like $loop of type object<React\EventLoop\LoopInterface> is incompatible with the declared type object<React\EventLoop\E...tLoop\StreamSelectLoop> of property $loop.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
68
        $this->sender = Sender::createFromLoop($loop);
69
        $this->messages = new MessageFactory();
70
71
        $this->browser = new Browser($loop, $this->sender, $this->messages);
72
    }
73
74
    /**
75
     * @return LoopInterface
76
     */
77
    public function getEventLoop(): LoopInterface
78
    {
79
        return $this->loop;
80
    }
81
82
    /**
83
     * @param Route $route
84
     * @return Observer
85
     * @throws \InvalidArgumentException
86
     */
87
    public function request(Route $route): Observer
88
    {
89
        $observer = new Observer();
90
91
        $this->promise($route)->then(function (ResponseInterface $response) use ($observer) {
92
            $this->onConnect($response, $observer);
93
        });
94
95
        return $observer;
96
    }
97
98
    /**
99
     * @param Route $route
100
     * @return Promise
101
     * @throws \InvalidArgumentException
102
     */
103
    private function promise(Route $route): Promise
104
    {
105
        list($method, $uri) = [$route->method(), $route->build()];
106
107
        // Log request
108
        $this->client->log(' -> ' . $method . ' ' . $uri, Logger::DEBUG);
109
110
        return $this->browser
111
            ->withOptions(['streaming' => true])
112
            ->{strtolower($method)}($route->build(), $this->buildHeaders($this->client));
113
    }
114
115
    /**
116
     * @param ResponseInterface $response
117
     * @param Observer $observer
118
     */
119
    private function onConnect(ResponseInterface $response, Observer $observer)
120
    {
121
        $json = new JsonStream();
122
123
        // Log response
124
        $this->client->log(' <- ' . $response->getStatusCode() . ' ' . $response->getReasonPhrase(), Logger::DEBUG);
125
126
        /* @var $body ReadableStreamInterface */
127
        $body = $response->getBody();
128
129
        $body->on('data', function ($chunk) use ($json, $observer) {
130
            // Log response chunk
131
            $this->client->log('   <- ' . $chunk, Logger::DEBUG);
132
133
            $json->push($chunk, function ($object) use ($observer) {
134
                $observer->fire($object);
135
            });
136
        });
137
    }
138
}
139