Stream   A
last analyzed

Complexity

Total Complexity 7

Size/Duplication

Total Lines 152
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 7

Importance

Changes 0
Metric Value
dl 0
loc 152
rs 10
c 0
b 0
f 0
wmc 7
lcom 1
cbo 7

5 Methods

Rating   Name   Duplication   Size   Complexity  
B __construct() 0 33 3
B connect() 0 29 1
A reconnect() 0 4 1
A subscribe() 0 6 1
A on() 0 6 1
1
<?php
2
3
/**
4
 * This file is part of GitterBot package.
5
 *
6
 * @author Serafim <[email protected]>
7
 * @date 24.09.2015 00:00
8
 *
9
 * For the full copyright and license information, please view the LICENSE
10
 * file that was distributed with this source code.
11
 */
12
13
namespace Interfaces\Gitter\Http;
14
15
use Interfaces\Gitter\Client;
16
use Carbon\Carbon;
17
use Illuminate\Events\Dispatcher;
18
use React\HttpClient\Response;
19
use Interfaces\Gitter\Support\StreamBuffer;
20
21
/**
22
 * Class Stream
23
 */
24
class Stream
25
{
26
    // Message part (chunk)
27
    const EVENT_CHUNK = 'chunk';
28
    // Full message string
29
    const EVENT_DATA = 'data';
30
    // Parsed as json data
31
    const EVENT_MESSAGE = 'message';
32
    // Connection
33
    const EVENT_CONNECT = 'connect';
34
    // Errors
35
    const EVENT_ERROR = 'error';
36
    // End
37
    const EVENT_END = 'end';
38
39
    /**
40
     * @var StreamBuffer
41
     */
42
    protected $buffer;
43
44
    /**
45
     * @var string|mixed|string
46
     */
47
    protected $url;
48
49
    /**
50
     * @var string
51
     */
52
    protected $method;
53
54
    /**
55
     * @var array
56
     */
57
    protected $headers;
58
59
    /**
60
     * @var Client
61
     */
62
    protected $client;
63
64
    /**
65
     * @var Dispatcher|null
66
     */
67
    protected $events = null;
68
69
    /**
70
     * @param Client $client
71
     * @param $route
72
     * @param array $args
73
     * @param string $method
74
     * @throws \InvalidArgumentException
75
     */
76
    public function __construct(Client $client, $route, array $args, $method = 'GET')
77
    {
78
        $this->url = $client->getRouter()->route($route, $args);
79
        $this->method = $method;
80
        $this->headers = $client->getHeaders();
81
        $this->client = $client;
82
        $this->buffer = new StreamBuffer();
83
        $this->events = new Dispatcher();
84
85
        $this->headers['Connection'] = 'Keep-Alive';
86
87
        $this->buffer->subscribe(function ($message) {
88
            $message = trim($message);
89
90
            if ($message) {
91
                $this->events->fire(static::EVENT_DATA, [$message]);
92
93
                $data = json_decode(trim($message), true);
94
95
                if (json_last_error() === JSON_ERROR_NONE) {
96
                    $this->events->fire(static::EVENT_MESSAGE, [$this, $data]);
97
98
                } else {
99
                    $this->events->fire(static::EVENT_ERROR, [
100
                        $this,
101
                        new \LogicException(json_last_error_msg()),
102
                    ]);
103
                }
104
            }
105
        });
106
107
        $this->connect();
108
    }
109
110
    /**
111
     * @return \React\HttpClient\Request
112
     */
113
    public function connect()
114
    {
115
        $request = $this->client
116
            ->getHttpClient()
117
            ->request($this->method, $this->url, $this->headers);
118
119
        $request->on('response', function (Response $response) {
120
            $response->on('data', function ($data, Response $response) {
121
                $data = (string)$data;
122
                $this->events->fire(static::EVENT_CHUNK, [$this, $data, $response]);
123
                $this->buffer->add($data);
124
            });
125
        });
126
127
        $request->on('end', function () {
128
            $this->buffer->clear();
129
            $this->events->fire(static::EVENT_END, [$this]);
130
        });
131
132
        $request->on('error', function ($exception) {
133
            $this->events->fire(static::EVENT_ERROR, [$this, $exception]);
134
        });
135
136
        $this->events->fire(static::EVENT_CONNECT, [$this, $request]);
137
138
        $request->end();
139
140
        return $request;
141
    }
142
143
    /**
144
     * @return \React\HttpClient\Request
145
     */
146
    public function reconnect()
147
    {
148
        return $this->connect();
149
    }
150
151
    /**
152
     * @deprecated
153
     * @param callable $callback
154
     * @return Stream
155
     */
156
    public function subscribe(callable $callback): Stream
157
    {
158
        $this->on(static::EVENT_MESSAGE, $callback);
159
160
        return $this;
161
    }
162
163
    /**
164
     * @param string|array $events
165
     * @param $listener
166
     * @param int $priority
167
     * @return $this
168
     */
169
    public function on($events, $listener, $priority = 0)
170
    {
171
        $this->events->listen($events, $listener, $priority);
172
173
        return $this;
174
    }
175
}
176