Client::maybeStopClient()   A
last analyzed

Complexity

Conditions 3
Paths 4

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 10
rs 9.9332
c 0
b 0
f 0
cc 3
nc 4
nop 0
1
<?php
2
3
namespace Kaliop\Queueing\Plugins\StompBundle\Adapter\Stomp;
4
5
use FuseSource\Stomp\Stomp as BaseClient;
6
use FuseSource\Stomp\Frame;
7
use FuseSource\Stomp\Message\Map;
8
use FuseSource\Stomp\Exception\StompException;
9
use Kaliop\QueueingBundle\Adapter\ForcedStopException;
10
11
class Client extends BaseClient
12
{
13
    public $debug = false;
14
    protected $forceStop = false;
15
    protected $forceStopReason;
16
    protected $dispatchSignals = false;
17
18
    /**
19
     * Connect to server. Reimplemented to sniff out Apollo
20
     *
21
     * @param string $username
22
     * @param string $password
23
     * @return boolean
24
     * @throws StompException
25
     */
26
    public function connect ($username = '', $password = '')
27
    {
28
        $this->_makeConnection();
29
        if ($username != '') {
30
            $this->_username = $username;
31
        }
32
        if ($password != '') {
33
            $this->_password = $password;
34
        }
35
        $headers = array('login' => $this->_username , 'passcode' => $this->_password);
36
        if ($this->clientId != null) {
37
            $headers["client-id"] = $this->clientId;
38
        }
39
        $frame = new Frame("CONNECT", $headers);
40
        $this->_writeFrame($frame);
41
        $frame = $this->readFrame();
42
43
        if ($frame instanceof Frame && $frame->command == 'CONNECTED') {
44
            $this->_sessionId = $frame->headers["session"];
45
            if (isset($frame->headers['server']) && false !== stristr(trim($frame->headers['server']), 'rabbitmq')) {
46
                $this->brokerVendor = 'RMQ';
47
            }
48
            if (isset($frame->headers['server']) && false !== strpos($frame->headers['server'], 'apache-apollo/')) {
49
                $this->brokerVendor = 'Apollo';
50
            }
51
            return true;
52
        } else {
53
            if ($frame instanceof Frame) {
54
                throw new StompException("Unexpected command: {$frame->command}", 0, $frame->body);
55
            } else {
56
                throw new StompException("Connection not acknowledged");
57
            }
58
        }
59
    }
60
61
    /**
62
     * Register to listen to a given destination.
63
     * Reimplemented to support Apollo, and subscriptions to both ActiveMQ queues and topics
64
     *
65
     * @param string $destination Destination queue
66
     * @param array $properties
67
     * @param boolean $sync Perform request synchronously
68
     * @return boolean
69
     * @throws StompException
70
     */
71
    public function subscribe ($destination, $properties = null, $sync = null)
72
    {
73
        $headers = array('ack' => 'client');
74
        if ($this->brokerVendor == 'AMQ') {
75
            $headers['activemq.prefetchSize'] = $this->prefetchSize;
76
        } else if ($this->brokerVendor == 'RMQ') {
77
            $headers['prefetch-count'] = $this->prefetchSize;
78
        }
79
80
        if ($this->clientId != null) {
81
            if ($this->brokerVendor == 'AMQ') {
82
                if (strpos($destination, '/queue/') !== 0) {
83
                    $headers['activemq.subscriptionName'] = $this->clientId;
84
                }
85
            } else if ($this->brokerVendor == 'RMQ' || $this->brokerVendor == 'Apollo') {
86
                $headers['id'] = $this->clientId;
87
            }
88
        }
89
90
        if (isset($properties)) {
91
            foreach ($properties as $name => $value) {
92
                $headers[$name] = $value;
93
            }
94
        }
95
        $headers['destination'] = $destination;
96
        $frame = new Frame('SUBSCRIBE', $headers);
97
        $this->_prepareReceipt($frame, $sync);
0 ignored issues
show
Bug introduced by
It seems like $sync defined by parameter $sync on line 71 can also be of type null; however, FuseSource\Stomp\Stomp::_prepareReceipt() does only seem to accept boolean, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
98
        $this->_writeFrame($frame);
99
        if ($this->_waitForReceipt($frame, $sync) == true) {
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like you are loosely comparing two booleans. Considering using the strict comparison === instead.

When comparing two booleans, it is generally considered safer to use the strict comparison operator.

Loading history...
Bug introduced by
It seems like $sync defined by parameter $sync on line 71 can also be of type null; however, FuseSource\Stomp\Stomp::_waitForReceipt() does only seem to accept boolean, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
100
            $this->_subscriptions[$destination] = $properties;
101
            return true;
102
        } else {
103
            return false;
104
        }
105
    }
106
107
    /**
108
     * Write frame to server. Reimplemented to add debug support
109
     *
110
     * @param Frame $stompFrame
111
     * @throws StompException
112
     */
113
    protected function _writeFrame (Frame $stompFrame)
114
    {
115
        if (!is_resource($this->_socket)) {
116
            throw new StompException('Socket connection hasn\'t been established');
117
        }
118
119
        $data = $stompFrame->__toString();
120
        $r = fwrite($this->_socket, $data, strlen($data));
121
        if ($r === false || $r == 0) {
122
            $this->_reconnect();
123
            $this->_writeFrame($stompFrame);
124
        } else {
125
            if ($this->debug) {
126
                echo "STOMP FRAME SENT:\n  ".str_replace("\n", "\n  ", $data)."\n";
127
            }
128
        }
129
    }
130
131
    /**
132
     * Read response frame from server
133
     *
134
     * @return Frame False when no frame to read
135
     */
136
    public function readFrame ()
137
    {
138
        if (!empty($this->_waitbuf)) {
139
            return array_shift($this->_waitbuf);
140
        }
141
142
        if (!$this->hasFrameToRead()) {
143
            return false;
0 ignored issues
show
Bug Best Practice introduced by
The return type of return false; (false) is incompatible with the return type documented by Kaliop\Queueing\Plugins\...Stomp\Client::readFrame of type FuseSource\Stomp\Frame.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
144
        }
145
146
        $rb = 1024;
147
        $data = '';
148
        $end = false;
149
150
        do {
151
            $read = fgets($this->_socket, $rb);
152
            if ($read === false || $read === "") {
153
                $this->_reconnect();
154
                return $this->readFrame();
155
            }
156
            $data .= $read;
157
            if (strpos($data, "\x00") !== false) {
158
                $end = true;
159
                $data = trim($data, "\n");
160
            }
161
            $len = strlen($data);
162
        } while ($len < 2 || $end == false);
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like you are loosely comparing two booleans. Considering using the strict comparison === instead.

When comparing two booleans, it is generally considered safer to use the strict comparison operator.

Loading history...
163
164
        if ($this->debug) {
165
            echo "STOMP FRAME RECEIVED:\n  ".str_replace("\n", "\n  ", $data)."\n";
166
        }
167
168
        list ($header, $body) = explode("\n\n", $data, 2);
169
        $header = explode("\n", $header);
170
        $headers = array();
171
        $command = null;
172
        foreach ($header as $v) {
173
            if (isset($command)) {
174
                list ($name, $value) = explode(':', $v, 2);
175
                $headers[$name] = $value;
176
            } else {
177
                $command = $v;
178
            }
179
        }
180
        $frame = new Frame($command, $headers, trim($body));
181
        if (isset($frame->headers['transformation']) && $frame->headers['transformation'] == 'jms-map-json') {
182
            return new Map($frame);
183
        } else {
184
            return $frame;
185
        }
186
        return $frame;
0 ignored issues
show
Unused Code introduced by
return $frame; does not seem to be reachable.

This check looks for unreachable code. It uses sophisticated control flow analysis techniques to find statements which will never be executed.

Unreachable code is most often the result of return, die or exit statements that have been added for debug purposes.

function fx() {
    try {
        doSomething();
        return true;
    }
    catch (\Exception $e) {
        return false;
    }

    return false;
}

In the above example, the last return false will never be executed, because a return statement has already been met in every possible execution path.

Loading history...
187
    }
188
189
    /**
190
     * Make socket connection to the server
191
     * Reimplemented to support forcestop
192
     *
193
     * @throws StompException
194
     */
195
    protected function _makeConnection()
196
    {
197
        if (count($this->_hosts) == 0) {
198
            throw new StompException("No broker defined");
199
        }
200
201
        // force disconnect, if previous established connection exists
202
        $this->disconnect();
203
204
        $i = $this->_currentHost;
205
        $att = 0;
206
        $connected = false;
207
        $connect_errno = null;
208
        $connect_errstr = null;
209
210
        while (! $connected && $att ++ < $this->_attempts) {
211
            if (isset($this->_params['randomize']) && $this->_params['randomize'] == 'true') {
212
                $i = rand(0, count($this->_hosts) - 1);
213
            } else {
214
                $i = ($i + 1) % count($this->_hosts);
215
            }
216
            $broker = $this->_hosts[$i];
217
            $host = $broker[0];
218
            $port = $broker[1];
219
            $scheme = $broker[2];
220
            if ($port == null) {
221
                $port = $this->_defaultPort;
222
            }
223
            if ($this->_socket != null) {
224
                fclose($this->_socket);
225
                $this->_socket = null;
226
            }
227
228
            $this->_socket = @fsockopen($scheme . '://' . $host, $port, $connect_errno, $connect_errstr, $this->_connect_timeout_seconds);
229
230
            $this->maybeStopClient();
231
232
            if (!is_resource($this->_socket) && $att >= $this->_attempts && !array_key_exists($i + 1, $this->_hosts)) {
233
                throw new StompException("Could not connect to $host:$port ($att/{$this->_attempts})");
234
            } else if (is_resource($this->_socket)) {
235
                $connected = true;
236
                $this->_currentHost = $i;
237
                break;
238
            }
239
        }
240
        if (! $connected) {
241
            throw new StompException("Could not connect to a broker");
242
        }
243
    }
244
245
    public function setHandleSignals($doHandle)
246
    {
247
        $this->dispatchSignals = $doHandle;
248
    }
249
250
    public function forceStop($reason = '')
251
    {
252
        $this->forceStop = true;
253
        $this->forceStopReason = $reason;
254
    }
255
256
    /**
257
     * Dispatches signals and throws an exception if user wants to stop. To be called at execution points when there is no data loss
258
     *
259
     * @throws ForcedStopException
260
     */
261
    protected function maybeStopClient()
262
    {
263
        if ($this->dispatchSignals) {
264
            pcntl_signal_dispatch();
265
        }
266
267
        if ($this->forceStop) {
268
            throw new ForcedStopException($this->forceStopReason);
269
        }
270
    }
271
}
272