Issues (15)

Security Analysis    no request data  

This project does not seem to handle request data directly as such no vulnerable execution paths were found.

  Cross-Site Scripting
Cross-Site Scripting enables an attacker to inject code into the response of a web-request that is viewed by other users. It can for example be used to bypass access controls, or even to take over other users' accounts.
  File Exposure
File Exposure allows an attacker to gain access to local files that he should not be able to access. These files can for example include database credentials, or other configuration files.
  File Manipulation
File Manipulation enables an attacker to write custom data to files. This potentially leads to injection of arbitrary code on the server.
  Object Injection
Object Injection enables an attacker to inject an object into PHP code, and can lead to arbitrary code execution, file exposure, or file manipulation attacks.
  Code Injection
Code Injection enables an attacker to execute arbitrary code on the server.
  Response Splitting
Response Splitting can be used to send arbitrary responses.
  File Inclusion
File Inclusion enables an attacker to inject custom files into PHP's file loading mechanism, either explicitly passed to include, or for example via PHP's auto-loading mechanism.
  Command Injection
Command Injection enables an attacker to inject a shell command that is execute with the privileges of the web-server. This can be used to expose sensitive data, or gain access of your server.
  SQL Injection
SQL Injection enables an attacker to execute arbitrary SQL code on your database server gaining access to user data, or manipulating user data.
  XPath Injection
XPath Injection enables an attacker to modify the parts of XML document that are read. If that XML document is for example used for authentication, this can lead to further vulnerabilities similar to SQL Injection.
  LDAP Injection
LDAP Injection enables an attacker to inject LDAP statements potentially granting permission to run unauthorized queries, or modify content inside the LDAP tree.
  Header Injection
  Other Vulnerability
This category comprises other attack vectors such as manipulating the PHP runtime, loading custom extensions, freezing the runtime, or similar.
  Regex Injection
Regex Injection enables an attacker to execute arbitrary code in your PHP process.
  XML Injection
XML Injection enables an attacker to read files on your local filesystem including configuration files, or can be abused to freeze your web-server process.
  Variable Injection
Variable Injection enables an attacker to overwrite program variables with custom data, and can lead to further vulnerabilities.
Unfortunately, the security analysis is currently not available for your project. If you are a non-commercial open-source project, please contact support to gain access.

src/Channel-Zmq/ZmqModel.php (10 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
namespace Dazzle\ChannelZmq;
4
5
use Dazzle\Channel\Model\ModelInterface;
6
use Dazzle\Channel\Channel;
7
use Dazzle\ChannelZmq\Connection\Connection;
8
use Dazzle\ChannelZmq\Connection\ConnectionPool;
9
use Dazzle\ChannelZmq\Buffer\Buffer;
10
use Dazzle\Event\BaseEventEmitter;
11
use Dazzle\Loop\LoopInterface;
12
use Dazzle\Loop\Timer\TimerInterface;
13
use Dazzle\Throwable\Exception\Runtime\ExecutionException;
14
use Dazzle\Zmq\ZmqContext;
15
use Dazzle\Zmq\ZmqSocket;
16
17
abstract class ZmqModel extends BaseEventEmitter implements ModelInterface
18
{
19
    /**
20
     * @var int
21
     */
22
    const CONNECTOR = 2;
23
24
    /**
25
     * @var int
26
     */
27
    const BINDER = 1;
28
29
    /**
30
     * @var int
31
     */
32
    const SOCKET_UNDEFINED = 1;
33
34
    /**
35
     * @var int
36
     */
37
    const COMMAND_HEARTBEAT = 1;
38
39
    /**
40
     * @var int
41
     */
42
    const COMMAND_MESSAGE = 2;
43
44
    /**
45
     * @var int
46
     */
47
    const MODE_STANDARD = Channel::MODE_STANDARD;
48
49
    /**
50
     * @var int
51
     */
52
    const MODE_BUFFER_ONLINE = Channel::MODE_BUFFER_ONLINE;
53
54
    /**
55
     * @var int
56
     */
57
    const MODE_BUFFER_OFFLINE = Channel::MODE_BUFFER_OFFLINE;
58
59
    /**
60
     * @var int
61
     */
62
    const MODE_BUFFER = Channel::MODE_BUFFER;
63
64
    /**
65
     * @var LoopInterface
66
     */
67
    protected $loop;
68
69
    /**
70
     * @var ZmqContext
71
     */
72
    protected $context;
73
74
    /**
75
     * @var string
76
     */
77
    protected $id;
78
79
    /**
80
     * @var string
81
     */
82
    protected $endpoint;
83
84
    /**
85
     * @var int
86
     */
87
    protected $type;
88
89
    /**
90
     * @var string[]
91
     */
92
    protected $hosts;
93
94
    /**
95
     * @var string[]
96
     */
97
    protected $flags;
98
99
    /**
100
     * @var mixed[]
101
     */
102
    protected $options;
103
104
    /**
105
     * @var bool
106
     */
107
    protected $isConnected;
108
109
    /**
110
     * @var string
111
     */
112
    protected $pendingOperation;
113
114
    /**
115
     * @var callable
116
     */
117
    protected $connectCallback;
118
119
    /**
120
     * @var callable
121
     */
122
    protected $disconnectCallback;
123
124
    /**
125
     * @var ZmqSocket
126
     */
127
    public $socket;
128
129
    /**
130
     * @var Buffer
131
     */
132
    protected $buffer;
133
134
    /**
135
     * @var ConnectionPool
136
     */
137
    protected $connectionPool;
138
139
    /**
140
     * @var TimerInterface
141
     */
142
    private $hTimer;
143
144
    /**
145
     * @var TimerInterface
146
     */
147
    private $rTimer;
148
149
    /**
150
     * @param LoopInterface $loop
151
     * @param string[] $params
152
     */
153
    public function __construct(LoopInterface $loop, $params)
154
    {
155
        $id         = $params['id'];
156
        $endpoint   = $params['endpoint'];
157
        $type       = $params['type'];
158
        $hosts      = $params['host'];
159
160
        $flags = [
161
            'enableHeartbeat'       => isset($params['enableHeartbeat']) ? $params['enableHeartbeat'] : true,
162
            'enableBuffering'       => isset($params['enableBuffering']) ? $params['enableBuffering'] : true,
163
            'enableTimeRegister'    => isset($params['enableTimeRegister']) ? $params['enableTimeRegister'] : true
164
        ];
165
166
        $options = [
167
            'bufferSize'            => isset($params['bufferSize']) ? (int)$params['bufferSize'] : 0,
168
            'bufferTimeout'         => isset($params['bufferTimeout']) ? (int)$params['bufferTimeout'] : 0,
169
            'heartbeatInterval'     => isset($params['heartbeatInterval']) ? (int)$params['heartbeatInterval'] : 200,
170
            'heartbeatKeepalive'    => isset($params['heartbeatKeepalive']) ? (int)$params['heartbeatKeepalive'] : 1000,
171
            'timeRegisterInterval'  => isset($params['timeRegisterInterval']) ? (int)$params['timeRegisterInterval'] : 400
172
        ];
173
174
        $this->loop = $loop;
175
        $this->context = new ZmqContext($this->loop);
176
        $this->id = $id;
177
        $this->endpoint = $endpoint;
178
        $this->type = $type;
0 ignored issues
show
Documentation Bug introduced by
The property $type was declared of type integer, but $type is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
179
        $this->hosts = is_array($hosts) ? $hosts : [ $hosts ];
180
        $this->flags = $flags;
0 ignored issues
show
Documentation Bug introduced by
It seems like $flags of type array<string,string|bool...ter":"string|boolean"}> is incompatible with the declared type array<integer,string> of property $flags.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
181
        $this->options = $options;
0 ignored issues
show
Documentation Bug introduced by
It seems like $options of type array<string,integer,{"b...erInterval":"integer"}> is incompatible with the declared type array<integer,*> of property $options.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
182
        $this->isConnected = false;
183
        $this->hTimer = null;
184
        $this->rTimer = null;
185
186
        $this->connectCallback = $this->getSocketConnectorType($this->type);
187
        $this->disconnectCallback = $this->getSocketDisconnectorType($this->type);
188
        $this->socket = $this->getSocket();
189
        $this->buffer = $this->getBuffer();
190
        $this->connectionPool = $this->getConnectionPool();
191
192
        $this->setEventListener('messages', [ $this, 'onMessages' ]);
193
    }
194
195
    /**
196
     *
197
     */
198
    public function __destruct()
199
    {
200
        $this->stop();
201
202
        $this->removeEventListener('messages', [ $this, 'onMessages' ]);
203
204
        unset($this->context);
205
        unset($this->id);
206
        unset($this->endpoint);
207
        unset($this->type);
208
        unset($this->hosts);
209
        unset($this->flags);
210
        unset($this->options);
211
        unset($this->isConnected);
212
        unset($this->hTimer);
213
        unset($this->rTimer);
214
215
        unset($this->connectCallback);
216
        unset($this->disconnectCallback);
217
        unset($this->socket);
218
        unset($this->buffer);
219
        unset($this->connectionPool);
220
        unset($this->loop);
221
    }
222
223
    /**
224
     * @override
225
     * @inheritDoc
226
     */
227
    public function start($blockEvent = false)
228
    {
229
        if ($this->isStarted())
230
        {
231
            return false;
232
        }
233
234
        $connect = $this->connectCallback;
235
        if (!$this->socket->$connect($this->endpoint))
236
        {
237
            $this->emit('error', [ new ExecutionException('socket not connected.') ]);
238
            return false;
239
        }
240
241
        $this->stopHeartbeat();
242
        $this->stopTimeRegister();
243
244
        $this->isConnected = true;
245
246
        $this->startHeartbeat();
247
        $this->startTimeRegister();
248
249
        $this->connectionPool->erase();
250
        $this->buffer->send();
251
252
        if (!$blockEvent)
253
        {
254
            $this->emit('start');
255
        }
256
257
        return true;
258
    }
259
260
    /**
261
     * @override
262
     * @inheritDoc
263
     */
264
    public function stop($blockEvent = false)
265
    {
266
        if (!$this->isStarted())
267
        {
268
            return false;
269
        }
270
271
        $this->stopHeartbeat();
272
        $this->stopTimeRegister();
273
274
        $disconnect = $this->disconnectCallback;
275
        $this->socket->$disconnect($this->endpoint);
276
277
        $this->isConnected = false;
278
279
        if (!$blockEvent)
280
        {
281
            $this->emit('stop');
282
        }
283
284
        return true;
285
    }
286
287
    /**
288
     * @override
289
     * @inheritDoc
290
     */
291
    public function unicast($id, $message, $flags = self::MODE_STANDARD)
292
    {
293
        $status = $this->sendMessage($id, self::COMMAND_MESSAGE, $message, $flags);
294
295
        $this->emit('send', [ $id, (array) $message ]);
296
297
        return $status;
298
    }
299
300
    /**
301
     * @override
302
     * @inheritDoc
303
     */
304
    public function broadcast($message)
305
    {
306
        $conns = $this->getConnected();
307
        $statuses = [];
308
309
        foreach ($conns as $conn)
310
        {
311
            $statuses[] = $this->sendMessage($conn, self::COMMAND_MESSAGE, $message, self::MODE_STANDARD);
312
        }
313
314
        foreach ($conns as $conn)
315
        {
316
            $this->emit('send', [ $conn, (array) $message ]);
317
        }
318
319
        return $statuses;
320
    }
321
322
    /**
323
     * @override
324
     * @inheritDoc
325
     */
326
    public function isStarted()
327
    {
328
        return $this->isConnected;
329
    }
330
331
    /**
332
     * @override
333
     * @inheritDoc
334
     */
335
    public function isStopped()
336
    {
337
        return !$this->isConnected;
338
    }
339
340
    /**
341
     * @override
342
     * @inheritDoc
343
     */
344
    public function isConnected($id)
345
    {
346
        return $this->connectionPool->validateConnection($id);
347
    }
348
349
    /**
350
     * @override
351
     * @inheritDoc
352
     */
353
    public function getConnected()
354
    {
355
        return $this->connectionPool->getConnected();
356
    }
357
358
    /**
359
     * Set connection statically to be marked as online until specific timestamp.
360
     *
361
     * @param string $id
362
     * @param float $until
363
     */
364
    public function markConnectionOnline($id, $until)
365
    {
366
        $this->connectionPool->setConnectionProperty($id, 'timestampIn', $until);
367
    }
368
369
    /**
370
     * Set connection statically to be marked always as online.
371
     *
372
     * @param string $id
373
     */
374
    public function markConnectionPersistent($id)
375
    {
376
        $this->connectionPool->setConnectionProperty($id, 'timestampIn', 0);
377
    }
378
379
    /**
380
     * @return int
381
     */
382
    abstract protected function getSocketType();
383
384
    /**
385
     * @param string[] $multipartMessage
386
     * @return string[]
387
     */
388
    abstract protected function parseBinderMessage($multipartMessage);
389
390
    /**
391
     * @param string[] $multipartMessage
392
     * @return string[]
393
     */
394
    abstract protected function parseConnectorMessage($multipartMessage);
395
396
    /**
397
     * @param string $id
398
     * @param string $type
399
     * @return string[]
400
     */
401
    abstract protected function prepareBinderMessage($id, $type);
402
403
    /**
404
     * @param string $id
405
     * @param string $type
406
     * @return string[]
407
     */
408
    abstract protected function prepareConnectorMessage($id, $type);
409
410
    /**
411
     * @return ZmqSocket
412
     */
413
    protected function getSocket()
414
    {
415
        $socket = $this->context->getSocket($this->getSocketType());
416
417
        $socket->setSockOpt(\ZMQ::SOCKOPT_IDENTITY, $this->id);
418
//        $socket->setSockOpt(\ZMQ::SOCKOPT_SNDHWM, $this->options['bufferSize']);
0 ignored issues
show
Unused Code Comprehensibility introduced by
65% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
419
//        $socket->setSockOpt(\ZMQ::SOCKOPT_RCVHWM, $this->options['bufferSize']);
420
        $socket->setSockOpt(\ZMQ::SOCKOPT_LINGER, $this->options['bufferTimeout']);
421
422
        return $socket;
423
    }
424
425
    /**
426
     * @return Buffer
427
     */
428
    protected function getBuffer()
429
    {
430
        return new Buffer($this->socket, $this->options['bufferSize']);
431
    }
432
433
    /**
434
     * @return ConnectionPool
435
     */
436
    protected function getConnectionPool()
437
    {
438
        return new ConnectionPool($this->options['heartbeatKeepalive'], $this->options['heartbeatInterval']);
439
    }
440
441
    /**
442
     * @param string $event
443
     * @param callable $callback
444
     */
445
    protected function setEventListener($event, callable $callback)
446
    {
447
        $this->socket->on($event, $callback);
448
    }
449
450
    /**
451
     * @param string $event
452
     * @param callable $callback
453
     */
454
    protected function removeEventListener($event, callable $callback)
455
    {
456
        $this->socket->removeListener($event, $callback);
457
    }
458
459
    /**
460
     * @param string[] $argv
461
     */
462
    public function onMessages($argv)
463
    {
464
        if ($this->type === self::BINDER)
465
        {
466
            list($id, $type, $message) = $this->parseBinderMessage($argv);
467
        }
468
        else if ($this->type === self::CONNECTOR)
469
        {
470
            list($id, $type, $message) = $this->parseConnectorMessage($argv);
471
        }
472
        else
473
        {
474
            return;
475
        }
476
477
        $conn = new Connection($id);
478
479
        switch ($type)
480
        {
481
            case self::COMMAND_HEARTBEAT:
482
                $this->onRecvHeartbeat($conn);
483
                break;
484
485
            case self::COMMAND_MESSAGE:
486
                $this->onRecvMessage($conn, $message);
487
                break;
488
489
            default:
490
                return;
491
        }
492
    }
493
494
    /**
495
     * @param int $type
496
     * @return int string
0 ignored issues
show
Should the return type not be string?

This check compares the return type specified in the @return annotation of a function or method doc comment with the types returned by the function and raises an issue if they mismatch.

Loading history...
497
     */
498
    private function getSocketConnectorType($type)
499
    {
500
        switch ($type)
501
        {
502
            case self::CONNECTOR:
503
                return 'connect';
504
            case self::BINDER:
505
                return 'bind';
506
            default:
507
                return 'fail';
508
        }
509
    }
510
511
    /**
512
     * @param int $type
513
     * @return int string
0 ignored issues
show
Should the return type not be string?

This check compares the return type specified in the @return annotation of a function or method doc comment with the types returned by the function and raises an issue if they mismatch.

Loading history...
514
     */
515
    private function getSocketDisconnectorType($type)
516
    {
517
        switch ($type)
518
        {
519
            case self::CONNECTOR:
520
                return 'disconnect';
521
            case self::BINDER:
522
                return 'unbind';
523
            default:
524
                return 'fail';
525
        }
526
    }
527
528
    /**
529
     * @param Connection $conn
530
     * @param string[] $message
531
     */
532
    private function onRecvMessage(Connection $conn, $message)
533
    {
534
        $this->recvMessage($conn, $message);
535
        $this->recvHeartbeat($conn);
536
    }
537
538
    /**
539
     * @param Connection $conn
540
     */
541
    private function onRecvHeartbeat(Connection $conn)
542
    {
543
        $this->recvHeartbeat($conn);
544
    }
545
546
    /**
547
     * @param Connection $conn
548
     * @param $message
549
     * @return mixed
550
     */
551
    private function recvMessage(Connection $conn, $message)
552
    {
553
        $this->emit('recv', [ $conn->id, $message ]);
554
    }
555
556
    /**
557
     * @param Connection $conn
558
     */
559
    private function recvHeartbeat(Connection $conn)
560
    {
561
        if ($this->flags['enableHeartbeat'] !== true)
562
        {
563
            return;
564
        }
565
566
        if ($this->connectionPool->setConnection($conn->id))
567
        {
568
            $this->emit('connect', [ $conn->getId() ]);
569
        }
570
571
        if ($this->type === self::BINDER)
572
        {
573
            $this->heartbeat($conn->id);
574
        }
575
    }
576
577
    /**
578
     *
579
     */
580
    private function fail()
581
    {
582
        return false;
583
    }
584
585
    /**
586
     * @param string $id
587
     * @return bool
588
     */
589
    private function heartbeat($id)
590
    {
591
        if ($this->connectionPool->isHeartbeatNeeded($id) === true)
592
        {
593
            return $this->sendMessage($id, self::COMMAND_HEARTBEAT);
594
        }
595
596
        return false;
597
    }
598
599
    /**
600
     *
601
     */
602
    private function startHeartbeat()
603
    {
604
        if ($this->hTimer === null && $this->flags['enableHeartbeat'])
605
        {
606
            $proxy = $this;
607
            $this->hTimer = $this->loop->addPeriodicTimer(($this->options['heartbeatInterval']/1000), function() use($proxy) {
608
609
                if ($proxy->type === self::CONNECTOR)
610
                {
611
                    foreach ($proxy->hosts as $hostid)
612
                    {
613
                        $proxy->heartbeat($hostid);
614
                    }
615
                }
616
617
                $this->clearConnectionPool();
618
            });
619
        }
620
    }
621
622
    /**
623
     *
624
     */
625
    private function clearConnectionPool()
626
    {
627
        $deleted = $this->connectionPool->removeInvalid();
628
629
        foreach ($deleted as $deletedid)
630
        {
631
            $this->emit('disconnect', [ $deletedid ]);
632
        }
633
    }
634
635
    /**
636
     *
637
     */
638
    private function stopHeartbeat()
639
    {
640
        if ($this->hTimer !== null)
641
        {
642
            $this->hTimer->cancel();
643
            $this->hTimer = null;
644
        }
645
    }
646
647
    /**
648
     * @param string $id
649
     * @param string $type
650
     * @param mixed $message
651
     * @return null|string[]
0 ignored issues
show
Should the return type not be null|array?

This check compares the return type specified in the @return annotation of a function or method doc comment with the types returned by the function and raises an issue if they mismatch.

Loading history...
652
     */
653
    private function getFrame($id, $type, $message)
654
    {
655
        if ($this->type === self::BINDER)
656
        {
657
            $frame = $this->prepareBinderMessage($id, $type);
658
        }
659
        else if ($this->type === self::CONNECTOR)
660
        {
661
            $frame = $this->prepareConnectorMessage($id, $type);
662
        }
663
        else
664
        {
665
            return null;
666
        }
667
668
        if ($message !== null)
669
        {
670
            if (is_object($message))
671
            {
672
                return null;
673
            }
674
            else if (!is_array($message))
675
            {
676
                $message = [ $message ];
677
            }
678
679
            $frame = array_merge($frame, $message);
680
        }
681
682
        return $frame;
683
    }
684
685
    /**
686
     * @param string $id
687
     * @param string $type
688
     * @param mixed $message
689
     * @param int $flags
690
     * @return bool
691
     */
692
    private function sendMessage($id, $type, $message = null, $flags = self::MODE_STANDARD)
693
    {
694
        if (($frame = $this->getFrame($id, $type, $message)) === null)
695
        {
696
            return false;
697
        }
698
699
        $isConnected = $this->isStarted();
700
701
        if (!$isConnected)
702
        {
703
            if ($this->flags['enableBuffering'] === true && ($flags & self::MODE_BUFFER_OFFLINE) === self::MODE_BUFFER_OFFLINE)
704
            {
705
                return $this->buffer->add($frame);
706
            }
707
        }
708
        else if ($type === self::COMMAND_HEARTBEAT)
0 ignored issues
show
Unused Code Bug introduced by
The strict comparison === seems to always evaluate to false as the types of $type (string) and self::COMMAND_HEARTBEAT (integer) can never be identical. Maybe you want to use a loose comparison == instead?
Loading history...
709
        {
710
            if ($this->socket->closed === false && $this->socket->send($frame))
0 ignored issues
show
$frame is of type array, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
711
            {
712
                $this->connectionPool->registerHeartbeat($id);
713
                return true;
714
            }
715
        }
716
        else if (($this->flags['enableHeartbeat'] === false) || ($this->flags['enableBuffering'] === true && ($flags & self::MODE_BUFFER_ONLINE) === self::MODE_BUFFER_ONLINE) || ($this->connectionPool->validateConnection($id) === true))
717
        {
718
            $this->socket->send($frame);
0 ignored issues
show
$frame is of type array, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
719
            $this->connectionPool->registerHeartbeat($id);
720
            return true;
721
        }
722
723
        return false;
724
    }
725
726
    /**
727
     * Start time register.
728
     *
729
     * Time register purpose is to cyclically increase timestamp representing last active tick of event loop. This method
730
     * allows model to not mark external sockets wrongly as offline because of its own heavy load.
731
     */
732
    private function startTimeRegister()
733
    {
734
        if ($this->rTimer === null && $this->flags['enableHeartbeat'] === true && $this->flags['enableTimeRegister'] === true)
735
        {
736
            $proxy = $this;
737
            $this->rTimer = $this->loop->addPeriodicTimer(($this->options['timeRegisterInterval']/1000), function() use($proxy) {
738
                $now = round(microtime(true)*1000);
739
                $proxy->connectionPool->setNow(function() use($now) {
740
                    return $now;
741
                });
742
            });
743
        }
744
    }
745
746
    /**
747
     * Stop time register.
748
     *
749
     * @see ZmqModel::startTimeRegister
750
     */
751
    private function stopTimeRegister()
752
    {
753
        if ($this->rTimer !== null)
754
        {
755
            $this->rTimer->cancel();
756
            $this->rTimer = null;
757
            $this->connectionPool->resetNow();
758
        }
759
    }
760
}
761