StreamClient::__construct()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 12
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 12
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 8
nc 2
nop 3
1
<?php
2
namespace Zend\Firebase\Stream;
3
4
use GuzzleHttp;
5
use RuntimeException;
6
7
/**
8
 * PHP7 FIREBASE LIBRARY (http://samuelventimiglia.it/)
9
 *
10
 *
11
 * @link https://github.com/samuel20miglia/zend_Firebase
12
 * @copyright Copyright (c) 2016-now Ventimiglia Samuel - Biasin Davide
13
 * @license BSD 3-Clause License
14
 *
15
 */
16
17
/**
18
 * This class create an object of Client
19
 *
20
 * @author ghostbyte
21
 * @package ZendFirebase
22
 * @since 2016-10-28
23
 *
24
 */
25
class StreamClient
26
{
27
28
    /**
29
     * Stream pattern END_OF_MESSAGE
30
     *
31
     * @var string
32
     */
33
    const END_OF_MESSAGE = "/\r\n\r\n|\n\n|\r\r/";
34
35
    /**
36
     * Client for send request
37
     *
38
     * @var GuzzleHttp\Client $client
39
     */
40
    private $client;
41
42
    /**
43
     * Responce object from rest
44
     *
45
     * @var GuzzleHttp\Psr7\Response $response
46
     */
47
    private $response;
48
49
    /**
50
     * Request url to send request
51
     *
52
     * @var string $url
53
     */
54
    private $url;
55
56
    /**
57
     * Request options to add to url
58
     *
59
     * @var string
60
     */
61
    private $options = [];
62
63
    /**
64
     * Last received message id
65
     *
66
     * @var string $lastMessageId
67
     */
68
    private $lastMessageId;
69
70
    /**
71
     * Reconnection time in milliseconds
72
     *
73
     * @var integer $retry
74
     */
75
    private $retry = 3000;
76
77
    /**
78
     * Constructor
79
     *
80
     * @param string $url
81
     * @param integer $requestDelay
82
     * @throws InvaliArgumentException
83
     */
84
    public function __construct($url, $requestDelay, $options)
85
    {
86
        $this->url = $url;
87
        $this->retry = $requestDelay;
88
        $this->options = $options;
89
90
        if (empty($this->url)) {
91
            throw new \InvalidArgumentException('Error: url empty...');
92
        }
93
        $this->createClientObject();
94
        $this->connect();
95
    }
96
97
    /**
98
     * Create client
99
     */
100
    private function createClientObject()
101
    {
102
        $this->client = new GuzzleHttp\Client([
103
            'headers' => [
104
                'Accept' => 'text/event-stream',
105
                'Cache-Control' => 'no-cache',
106
                'allow_redirects' => true
107
            ],
108
109
            'base_uri' => $this->url,
110
        ]);
111
112
        $this->connect();
113
    }
114
115
    /**
116
     *
117
     * @return string $lastMessageId
118
     */
119
    public function getLastMessageId(): string
120
    {
121
        return $this->lastMessageId;
122
    }
123
124
    /**
125
     *
126
     * @param string $lastMessageId
127
     */
128
    public function setLastMessageId($lastMessageId)
129
    {
130
        $this->lastMessageId = $lastMessageId;
131
    }
132
133
    /**
134
     * Connect to firebase server
135
     *
136
     * @throws RuntimeException
137
     */
138
    private function connect()
139
    {
140
141
        $this->sendRequest();
142
143
        if ($this->response->getStatusCode() == 204) {
144
            throw new RuntimeException('Error: Server forbid connection retry by responding 204 status code.');
145
        }
146
    }
147
148
    /**
149
     * Create url with or without query options
150
     *
151
     * @return string
152
     */
153
    private function createUrl(): string
154
    {
155
        return $this->url . $this->options;
156
    }
157
158
    /**
159
     * Send Request
160
     */
161
    private function sendRequest()
162
    {
163
        try {
164
            $headers = [];
165
            if ($this->lastMessageId) {
166
                $headers['Last-Event-ID'] = $this->lastMessageId;
167
            }
168
169
            $this->response = $this->client->request('GET', $this->createUrl(), [
170
            'stream' => true,
171
            'headers' => $headers,
172
            ]);
173
        } catch (\GuzzleHttp\Exception\ClientException $e) {
174
            die((string)$e->getResponse()->getBody());
175
        }
176
    }
177
178
179
    /**
180
     * Returns generator that yields new event when it's available on stream.
181
     */
182
    public function getEvents()
183
    {
184
        /* initialize empty buffer */
185
        $buffer = '';
186
187
        /* bring body of response */
188
        $body = $this->response->getBody();
189
190
        /* infinte loop */
191
        while (true) {
192
            /* if server close connection - try to reconnect */
193
            if ($body->eof()) {
194
                /* wait retry period before reconnection */
195
                sleep($this->retry / 1000);
196
197
                /* reconnect */
198
                $this->connect();
199
200
                /* clear buffer since there is no sense in partial message */
201
                $buffer = '';
202
            }
203
            /* start read into stream */
204
            $buffer .= $body->read(1);
205
206
            if (preg_match(self::END_OF_MESSAGE, $buffer)) {
207
                $parts = preg_split(self::END_OF_MESSAGE, $buffer, 2);
208
209
                $rawMessage = $parts[0];
210
                $remaining = $parts[1];
211
212
                $buffer = $remaining;
213
214
                /**
215
                 * Save event into StreamEvent
216
                 *
217
                 * @var StreamEvent $event
218
                 */
219
                $event = StreamEvent::parse($rawMessage);
220
221
                yield $event;
222
            }
223
        }
224
    }
225
}
226