Completed
Push — master ( 393947...6e28b0 )
by Ventimiglia
02:45 queued 10s
created

StreamClient::sendRequest()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 12
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 12
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 7
nc 2
nop 0
1
<?php
2
namespace ZendFirebase\Stream;
3
4
use GuzzleHttp;
5
use RuntimeException;
6
use phpDocumentor\Reflection\DocBlock\Tags\Generic;
7
8
/**
9
 * PHP7 FIREBASE LIBRARY (http://samuelventimiglia.it/)
10
 *
11
 *
12
 * @link https://github.com/Samuel18/zend_Firebase
13
 * @copyright Copyright (c) 2016-now Ventimiglia Samuel - Biasin Davide
14
 * @license BSD 3-Clause License
15
 *
16
 */
17
18
/**
19
 * This class create an object of Client
20
 *
21
 * @author ghostbyte
22
 * @package ZendFirebase
23
 * @since 2016-10-28
24
 *
25
 */
26
class StreamClient
27
{
28
29
    /**
30
     * Stream pattern END_OF_MESSAGE
31
     *
32
     * @var string
33
     */
34
    const END_OF_MESSAGE = "/\r\n\r\n|\n\n|\r\r/";
35
36
    /**
37
     * Client for send request
38
     *
39
     * @var GuzzleHttp\Client $client
40
     */
41
    private $client;
42
43
    /**
44
     * Responce object from rest
45
     *
46
     * @var GuzzleHttp\Psr7\Response $response
47
     */
48
    private $response;
49
50
    /**
51
     * Request url to send request
52
     *
53
     * @var string $url
54
     */
55
    private $url;
56
57
    /**
58
     * Last received message id
59
     *
60
     * @var string $lastMessageId
61
     */
62
    private $lastMessageId;
63
64
    /**
65
     * Reconnection time in milliseconds
66
     *
67
     * @var integer $retry
68
     */
69
    private $retry = 3000;
70
71
    /**
72
     * Constructor
73
     *
74
     * @param string $url
75
     * @param integer $requestDelay
76
     * @throws InvaliArgumentException
77
     */
78
    public function __construct($url, $requestDelay)
79
    {
80
        $this->url = $url;
81
        $this->retry = $requestDelay;
82
        
83
        if (empty($this->url)) {
84
            throw new \InvalidArgumentException('Error: url empty...');
85
        }
86
        $this->createClientObject();
87
        $this->connect();
88
    }
89
90
    /**
91
     * Create client
92
     */
93
    private function createClientObject()
94
    {
95
        $this->client = new GuzzleHttp\Client([
96
            'headers' => [
97
                'Accept' => 'text/event-stream',
98
                'Cache-Control' => 'no-cache',
99
                'allow_redirects' => true
100
            ]
101
        ]);
102
        
103
        $this->connect();
104
    }
105
106
    /**
107
     *
108
     * @return string $lastMessageId
109
     */
110
    public function getLastMessageId(): string
111
    {
112
        return $this->lastMessageId;
113
    }
114
115
    /**
116
     *
117
     * @param string $lastMessageId
118
     */
119
    public function setLastMessageId($lastMessageId)
120
    {
121
        $this->lastMessageId = $lastMessageId;
122
    }
123
124
    /**
125
     * Connect to firebase server
126
     *
127
     * @throws RuntimeException
128
     */
129
    private function connect()
130
    {
131
        
132
        $this->sendRequest();
133
        
134
        if ($this->response->getStatusCode() == 204) {
135
            throw new RuntimeException('Error: Server forbid connection retry by responding 204 status code.');
136
        }
137
    }
138
    
139
    /**
140
     * Send Request
141
     */
142
    private function sendRequest()
143
    {
144
        $headers = [];
145
        if ($this->lastMessageId) {
146
            $headers['Last-Event-ID'] = $this->lastMessageId;
147
        }
148
        
149
        $this->response = $this->client->request('GET', $this->url, [
150
            'stream' => true,
151
            'headers' => $headers
152
        ]);
153
    }
154
155
156
    /**
157
     * Returns generator that yields new event when it's available on stream.
158
     */
159
    public function getEvents()
160
    {
161
        /* initialize empty buffer */
162
        $buffer = '';
163
        
164
        /* bring body of response */
165
        $body = $this->response->getBody();
166
        
167
        /* infinte loop */
168
        while (true) {
169
            /* if server close connection - try to reconnect */
170
            if ($body->eof()) {
171
                /* wait retry period before reconnection */
172
                sleep($this->retry / 1000);
173
                
174
                /* reconnect */
175
                $this->connect();
176
                
177
                /* clear buffer since there is no sense in partial message */
178
                $buffer = '';
179
            }
180
            /* start read into stream */
181
            $buffer .= $body->read(1);
182
            
183
            if (preg_match(self::END_OF_MESSAGE, $buffer)) {
184
                $parts = preg_split(self::END_OF_MESSAGE, $buffer, 2);
185
                
186
                $rawMessage = $parts[0];
187
                $remaining = $parts[1];
188
                
189
                $buffer = $remaining;
190
                
191
                /**
192
                 * Save event into StreamEvent
193
                 *
194
                 * @var StreamEvent $event
195
                 */
196
                $event = StreamEvent::parse($rawMessage);
197
                
198
                yield $event;
199
            }
200
        }
201
    }
202
}
203