Channel::getModel()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
crap 1
1
<?php
2
3
namespace Dazzle\Channel;
4
5
use Dazzle\Channel\Encoder\Encoder;
6
use Dazzle\Channel\Encoder\EncoderInterface;
7
use Dazzle\Channel\Extra\Request;
8
use Dazzle\Channel\Extra\Response;
9
use Dazzle\Channel\Model\ModelInterface;
10
use Dazzle\Channel\Protocol\Protocol;
11
use Dazzle\Channel\Protocol\ProtocolInterface;
12
use Dazzle\Channel\Record\RequestRecordStorage;
13
use Dazzle\Channel\Record\ResponseRecordStorage;
14
use Dazzle\Channel\Router\RouterCompositeInterface;
15
use Dazzle\Event\EventEmitter;
16
use Dazzle\Event\EventListener;
17
use Dazzle\Loop\Timer\TimerInterface;
18
use Dazzle\Loop\LoopAwareTrait;
19
use Dazzle\Loop\LoopInterface;
20
use Dazzle\Util\Support\GeneratorSupport;
21
use Dazzle\Util\Support\StringSupport;
22
use Dazzle\Util\Support\TimeSupport;
23
use Dazzle\Throwable\Exception\System\TaskIncompleteException;
24
use Dazzle\Throwable\Exception\Logic\InstantiationException;
25
use Dazzle\Throwable\Exception\LogicException;
26
use Dazzle\Throwable\Exception;
27
use Dazzle\Throwable\ThrowableProxy;
28
29
class Channel extends EventEmitter implements ChannelInterface
30
{
31
    use LoopAwareTrait;
32
    use RequestRecordStorage;
33
    use ResponseRecordStorage;
34
35
    /**
36
     * @var string
37
     */
38
    const TYPE_SND = 'SND';
39
40
    /**
41
     * @var string
42
     */
43
    const TYPE_REQ = 'REQ';
44
45
    /**
46
     * @var int
47
     */
48
    const BINDER = 1;
49
50
    /**
51
     * @var int
52
     */
53
    const CONNECTOR = 2;
54
55
    /**
56
     * @var int
57
     */
58
    const MODE_DEFAULT = 0;
59
60
    /**
61
     * @var int
62
     */
63
    const MODE_STANDARD = 0;
64
65
    /**
66
     * @var int
67
     */
68
    const MODE_BUFFER_ONLINE = 1;
69
70
    /**
71
     * @var int
72
     */
73
    const MODE_BUFFER_OFFLINE = 2;
74
75
    /**
76
     * @var int
77
     */
78
    const MODE_BUFFER = 3;
79
80
    /**
81
     * @var string
82
     */
83
    protected $name;
84
85
    /**
86
     * @var ModelInterface
87
     */
88
    protected $model;
89
90
    /**
91
     * @var RouterCompositeInterface
92
     */
93
    protected $router;
94
95
    /**
96
     * @var EncoderInterface
97
     */
98
    protected $encoder;
99
100
    /**
101
     * @var EventListener[]
102
     */
103
    protected $handlers;
104
105
    /**
106
     * @var string
107
     */
108
    protected $seed;
109
110
    /**
111
     * @var int
112
     */
113
    protected $counter;
114
115
    /**
116
     * @var TimerInterface
117
     */
118
    protected $reqsHelperTimer;
119
120
    /**
121
     * @var TimerInterface
122
     */
123
    protected $repsHelperTimer;
124
125
    /**
126
     * @param string $name
127
     * @param ModelInterface $model
128
     * @param RouterCompositeInterface $router
129
     * @param EncoderInterface $encoder
130
     * @param LoopInterface $loop
131
     * @throws InstantiationException
132
     */
133 84
    public function __construct($name, ModelInterface $model, RouterCompositeInterface $router, EncoderInterface $encoder, LoopInterface $loop)
134
    {
135 84
        parent::__construct($loop);
136
137
        try
138
        {
139 84
            $router->getBus('input');
140 84
            $router->getBus('output');
141
        }
142
        catch (Exception $ex)
143
        {
144
            throw new InstantiationException("Could not construct Dazzle\\Channel\\Channel due to Router wrong configuration.");
145
        }
146
147 84
        $this->name = $name;
148 84
        $this->model = $model;
149 84
        $this->router = $router;
150 84
        $this->encoder = $encoder;
151 84
        $this->loop = $loop;
152 84
        $this->handlers = [];
153 84
        $this->seed = GeneratorSupport::genId($this->name);
154 84
        $this->counter = 1e9;
0 ignored issues
show
Documentation Bug introduced by
The property $counter was declared of type integer, but 1000000000.0 is of type double. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
155 84
        $this->reqsHelperTimer = null;
156 84
        $this->repsHelperTimer = null;
157 84
        $this->handledRepsTimeout = 10e3;
0 ignored issues
show
Documentation Bug introduced by
The property $handledRepsTimeout was declared of type integer, but 10000.0 is of type double. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
158
159 84
        $this->registerEvents();
160 84
        $this->registerPeriodicTimers();
161 84
    }
162
163
    /**
164
     *
165
     */
166 6
    public function __destruct()
167
    {
168 6
        $this->unregisterEvents();
169 6
        $this->unregisterPeriodicTimers();
170
171 6
        unset($this->name);
172 6
        unset($this->model);
173 6
        unset($this->router);
174 6
        unset($this->encoder);
175 6
        unset($this->handlers);
176 6
        unset($this->seed);
177 6
        unset($this->counter);
178 6
        unset($this->reqsHelperTimer);
179 6
        unset($this->repsHelperTimer);
180 6
        unset($this->reqs);
181 6
        unset($this->reps);
182 6
        unset($this->handledReps);
183 6
        unset($this->handledRepsTimeout);
184 6
        unset($this->loop);
185 6
    }
186
187
    /**
188
     * @override
189
     * @inheritDoc
190
     */
191 1
    public function getName()
192
    {
193 1
        return $this->name;
194
    }
195
196
    /**
197
     * @override
198
     * @inheritDoc
199
     */
200 1
    public function getModel()
201
    {
202 1
        return $this->model;
203
    }
204
205
    /**
206
     * @override
207
     * @inheritDoc
208
     */
209 1
    public function getRouter()
210
    {
211 1
        return $this->router;
212
    }
213
214
    /**
215
     * @override
216
     * @inheritDoc
217
     */
218 1
    public function getInput()
219
    {
220 1
        return $this->router->getBus('input');
221
    }
222
223
    /**
224
     * @override
225
     * @inheritDoc
226
     */
227 1
    public function getOutput()
228
    {
229 1
        return $this->router->getBus('output');
230
    }
231
232
    /**
233
     * @override
234
     * @inheritDoc
235
     */
236 4 View Code Duplication
    public function createProtocol($message = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
237
    {
238 4
        if ($message === null)
239
        {
240 2
            $message = '';
241
        }
242 2
        else if (!is_array($message))
243
        {
244 2
            $message = (string) $message;
245
        }
246
247 4
        return new Protocol('', $this->genID(), '', $this->name, $message, '', $this->getTime());
0 ignored issues
show
Bug introduced by
It seems like $message defined by parameter $message on line 236 can also be of type array; however, Dazzle\Channel\Protocol\Protocol::__construct() does only seem to accept string, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
248
    }
249
250
    /**
251
     * @override
252
     * @inheritDoc
253
     */
254 1
    public function onStart(callable $handler)
255
    {
256 1
        return $this->on('start', $handler);
257
    }
258
259
    /**
260
     * @override
261
     * @inheritDoc
262
     */
263 1
    public function onStop(callable $handler)
264
    {
265 1
        return $this->on('stop', $handler);
266
    }
267
268
    /**
269
     * @override
270
     * @inheritDoc
271
     */
272 1
    public function onConnect(callable $handler)
273
    {
274 1
        return $this->on('connect', $handler);
275
    }
276
277
    /**
278
     * @override
279
     * @inheritDoc
280
     */
281 1
    public function onDisconnect(callable $handler)
282
    {
283 1
        return $this->on('disconnect', $handler);
284
    }
285
286
    /**
287
     * @override
288
     * @inheritDoc
289
     */
290 1
    public function onInput(callable $handler)
291
    {
292 1
        return $this->on('input', $handler);
293
    }
294
295
    /**
296
     * @override
297
     * @inheritDoc
298
     */
299 1
    public function onOutput(callable $handler)
300
    {
301 1
        return $this->on('output', $handler);
302
    }
303
304
    /**
305
     * @override
306
     * @inheritDoc
307
     */
308 1
    public function start()
309
    {
310 1
        $this->model->start();
311 1
    }
312
313
    /**
314
     * @override
315
     * @inheritDoc
316
     */
317 1
    public function stop()
318
    {
319 1
        $this->model->stop();
320 1
    }
321
322
    /**
323
     * @override
324
     * @inheritDoc
325
     */
326 2 View Code Duplication
    public function send($name, $message, $flags = Channel::MODE_DEFAULT, callable $success = null, callable $failure = null, callable $cancel = null, $timeout = 0.0)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
327
    {
328 2
        if ($success !== null || $failure !== null || $cancel !== null)
329
        {
330 1
            return $this->sendRequest($name, $message, $flags, $success, $failure, $cancel, $timeout);
331
        }
332
333 1
        return $this->sendAsync($name, $message, $flags);
334
    }
335
336
    /**
337
     * @override
338
     * @inheritDoc
339
     */
340 2 View Code Duplication
    public function push($name, $message, $flags = Channel::MODE_DEFAULT, callable $success = null, callable $failure = null, callable $cancel = null, $timeout = 0.0)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
341
    {
342 2
        if ($success !== null || $failure !== null || $cancel !== null)
343
        {
344 1
            return $this->pushRequest($name, $message, $flags, $success, $failure, $cancel, $timeout);
345
        }
346
347 1
        return $this->pushAsync($name, $message, $flags);
348
    }
349
350
    /**
351
     * @override
352
     * @inheritDoc
353
     */
354 5 View Code Duplication
    public function sendAsync($name, $message, $flags = Channel::MODE_DEFAULT)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
355
    {
356 5
        $protocol = $this->createMessageProtocol($message);
357 5
        $names = (array) $name;
358 5
        $handlers = [];
359
360 5
        foreach ($names as $name)
361
        {
362 4
            $handlers[] = $this->handleSendAsync($name, $protocol, $flags);
363
        }
364
365 5
        return !isset($handlers[0]) || isset($handlers[1]) ? $handlers : $handlers[0];
366
    }
367
368
    /**
369
     * @override
370
     * @inheritDoc
371
     */
372 5 View Code Duplication
    public function pushAsync($name, $message, $flags = Channel::MODE_DEFAULT)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
373
    {
374 5
        $protocol = $this->createMessageProtocol($message);
375 5
        $names = (array) $name;
376 5
        $handlers = [];
377
378 5
        foreach ($names as $name)
379
        {
380 4
            $handlers[] = $this->handlePushAsync($name, $protocol, $flags);
381
        }
382
383 5
        return !isset($handlers[0]) || isset($handlers[1]) ? $handlers : $handlers[0];
384
    }
385
386
    /**
387
     * @override
388
     * @inheritDoc
389
     */
390 5 View Code Duplication
    public function sendRequest($name, $message, $flags = Channel::MODE_DEFAULT, callable $success = null, callable $failure = null, callable $cancel = null, $timeout = 0.0)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
391
    {
392 5
        $protocol = $this->createMessageProtocol($message);
393 5
        $names = (array) $name;
394 5
        $handlers = [];
395
396 5
        foreach ($names as $name)
397
        {
398 4
            $handlers[] = $this->handleSendRequest($name, $protocol, $flags, $success, $failure, $cancel, $timeout);
399
        }
400
401 5
        return !isset($handlers[0]) || isset($handlers[1]) ? $handlers : $handlers[0];
402
    }
403
404
    /**
405
     * @override
406
     * @inheritDoc
407
     */
408 5 View Code Duplication
    public function pushRequest($name, $message, $flags = Channel::MODE_DEFAULT, callable $success = null, callable $failure = null, callable $cancel = null, $timeout = 0.0)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
409
    {
410 5
        $protocol = $this->createMessageProtocol($message);
411 5
        $names = (array) $name;
412 5
        $handlers = [];
413
414 5
        foreach ($names as $name)
415
        {
416 4
            $handlers[] = $this->handlePushRequest($name, $protocol, $flags, $success, $failure, $cancel, $timeout);
417
        }
418
419 5
        return !isset($handlers[0]) || isset($handlers[1]) ? $handlers : $handlers[0];
420
    }
421
422
    /**
423
     * @override
424
     * @inheritDoc
425
     */
426 5
    public function receive($sender, ProtocolInterface $protocol)
427
    {
428 5
        if ($this->handleReceiveRequest($protocol))
429
        {
430 2
            return;
431
        }
432
433 3
        if ($this->handleReceiveResponse($protocol) || $this->getInput()->handle($sender, $protocol))
434
        {
435 2
            $this->emit('input', [ $sender, $protocol ]);
436
        }
437 3
    }
438
439
    /**
440
     * @override
441
     * @inheritDoc
442
     */
443 1
    public function pull($sender, ProtocolInterface $protocol)
444
    {
445 1
        $this->emit('input', [ $sender, $protocol ]);
446 1
    }
447
448
    /**
449
     * @override
450
     * @inheritDoc
451
     */
452 1
    public function isStarted()
453
    {
454 1
        return $this->model->isStarted();
455
    }
456
457
    /**
458
     * @override
459
     * @inheritDoc
460
     */
461 1
    public function isStopped()
462
    {
463 1
        return $this->model->isStopped();
464
    }
465
466
    /**
467
     * @override
468
     * @inheritDoc
469
     */
470 2
    public function isConnected($name)
471
    {
472 2
        if (is_array($name))
473
        {
474 1
            $statuses = [];
475
476 1
            foreach ($name as $singleName)
477
            {
478 1
                $statuses[] = $this->model->isConnected($singleName);
479
            }
480
481 1
            return $statuses;
482
        }
483
484 1
        return $this->model->isConnected($name);
485
    }
486
487
    /**
488
     * @override
489
     * @inheritDoc
490
     */
491 2
    public function getConnected()
492
    {
493 2
        return $this->model->getConnected();
494
    }
495
496
    /**
497
     * @override
498
     * @inheritDoc
499
     */
500 1
    public function filterConnected($pattern)
501
    {
502 1
        return StringSupport::find($pattern, $this->getConnected());
503
    }
504
505
    /**
506
     * @override
507
     * @inheritDoc
508
     */
509 1 View Code Duplication
    protected function handleSendAsync($name, $message, $flags = Channel::MODE_DEFAULT)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
510
    {
511 1
        if ($message->getType() === '')
512
        {
513 1
            $message->setType(Channel::TYPE_SND);
514
        }
515 1
        if ($message->getDestination() === '')
516
        {
517 1
            $message->setDestination($name);
518
        }
519
520 1
        return $this->getOutput()->handle($name, $message, $flags);
521
    }
522
523
    /**
524
     * @override
525
     * @inheritDoc
526
     */
527
    protected function handlePushAsync($name, $message, $flags = Channel::MODE_DEFAULT)
528
    {
529
        if ($message->getType() === '')
530
        {
531
            $message->setType(Channel::TYPE_SND);
532
        }
533
        if ($message->getDestination() === '')
534
        {
535
            $message->setDestination($name);
536
        }
537
538
        $status = $this->model->unicast(
539
            $name,
540
            $this->encoder->with($message)->encode(),
541
            $flags
542
        );
543
544
        if ($status)
545
        {
546
            $this->resolveOrRejectResponse($message->getPid(), $message->getException());
547
            $this->emit('output', [ $name, $message ]);
548
        }
549
550
        return $status;
551
    }
552
553
    /**
554
     * @override
555
     * @inheritDoc
556
     */
557 1
    protected function handleSendRequest($name, $message, $flags = Channel::MODE_DEFAULT, callable $success = null, callable $failure = null, callable $cancel = null, $timeout = 0.0)
558
    {
559 1
        if ($message->getType() === '')
560
        {
561 1
            $message->setType(Channel::TYPE_REQ);
562
        }
563 1
        if ($message->getDestination() === '')
564
        {
565 1
            $message->setDestination($name);
566
        }
567
568 1
        return $this->getOutput()->handle($name, $message, $flags, $success, $failure, $cancel, $timeout);
569
    }
570
571
    /**
572
     * @override
573
     * @inheritDoc
574
     */
575
    protected function handlePushRequest($name, $message, $flags = Channel::MODE_DEFAULT, callable $success = null, callable $failure = null, callable $cancel = null, $timeout = 0.0)
576
    {
577
        if ($message->getType() === '')
578
        {
579
            $message->setType(Channel::TYPE_REQ);
580
        }
581
        if ($message->getDestination() === '')
582
        {
583
            $message->setDestination($name);
584
        }
585
586
        $status = $this->model->unicast(
587
            $name,
588
            $this->encoder->with($message)->encode(),
589
            $flags
590
        );
591
592
        if (!$status)
593
        {
594
            if ($cancel !== null)
595
            {
596
                $cancel(new LogicException('Request could not be sent.'));
597
            }
598
599
            return null;
600
        }
601
602
        $pid = $message->getPid();
603
        $request = $this->createRequest($pid, $success, $failure, $cancel, $timeout);
604
        $this->addRequest($pid, $request);
605
606
        $this->emit('output', [ $name, $message ]);
607
608
        return $request;
609
    }
610
611
    /**
612
     * @override
613
     * @inheritDoc
614
     */
615
    public function handleReceive($sender, $multipartMessage)
616
    {
617
        $protocol = $this->encoder
618
            ->with(new Protocol())
619
            ->decode(implode('', $multipartMessage));
620
621
        if ($this->handleReceiveRequest($protocol))
622
        {
623
            return;
624
        }
625
626
        if ($this->handleReceiveResponse($protocol) || $this->getInput()->handle($sender, $protocol))
627
        {
628
            return;
629
        }
630
    }
631
632
    /**
633
     * @param ProtocolInterface
634
     * @return bool
635
     */
636
    protected function handleReceiveRequest(ProtocolInterface $protocol)
637
    {
638
        if ($protocol->getType() === Channel::TYPE_REQ && $protocol->getDestination() === $this->name)
639
        {
640
            $pid = $protocol->getPid();
641
            $timestamp = $protocol->getTimestamp();
642
            $now = $this->getTime();
643
644
            if ($timestamp <= $now || $this->existsResponse($pid))
645
            {
646
                return true;
647
            }
648
649
            $timestamp -= 5e3;
650
            $this->addResponse($pid, $this->createResponse($pid, $protocol->getOrigin(), $timestamp, $timestamp - $now));
651
        }
652
653
        return false;
654
    }
655
656
    /**
657
     * @param ProtocolInterface $protocol
658
     * @return bool
659
     */
660
    protected function handleReceiveResponse(ProtocolInterface $protocol)
661
    {
662
        $pid = $protocol->getPid();
663
664
        if (!$this->existsRequest($pid))
665
        {
666
            return false;
667
        }
668
669
        $message = $protocol->getMessage();
670
        $exception = $protocol->getException();
671
672
        if ($exception === '')
673
        {
674
            $this->resolveRequest($pid, $message);
675
        }
676
        else if ($exception === TaskIncompleteException::class)
677
        {
678
            $this->cancelRequest($pid, new ThrowableProxy([ $exception, $message ]));
679
        }
680
        else
681
        {
682
            $this->rejectRequest($pid, new ThrowableProxy([ $exception, $message ]));
683
        }
684
685
        return true;
686
    }
687
688
    /**
689
     * @return string
690
     */
691 7
    protected function genID()
692
    {
693 7
        return $this->seed . $this->getNextSuffix();
694
    }
695
696
    /**
697
     * @return float
698
     */
699 7
    protected function getTime()
700
    {
701 7
        return TimeSupport::now();
702
    }
703
704
    /**
705
     * @return string
706
     */
707 9 View Code Duplication
    protected function getNextSuffix()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
708
    {
709 9
        if ($this->counter > 2e9)
710
        {
711 1
            $this->counter = 1e9;
0 ignored issues
show
Documentation Bug introduced by
The property $counter was declared of type integer, but 1000000000.0 is of type double. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
712 1
            $this->seed = GeneratorSupport::genId($this->name);
713
        }
714
715 9
        return (string) $this->counter++;
716
    }
717
718
    /**
719
     * @param string|string[] $message
720
     * @return ProtocolInterface
721
     */
722 5
    protected function createMessageProtocol($message)
723
    {
724 5
        if (!($message instanceof ProtocolInterface))
725
        {
726 2
            $message = $this->createProtocol($message);
727
        }
728
        else
729
        {
730 3
            if ($message->getPid() === '')
731
            {
732 2
                $message->setPid($this->genID());
733
            }
734 3
            if ($message->getOrigin() === '')
735
            {
736 2
                $message->setOrigin($this->name);
737
            }
738 3
            if ($message->getTimestamp() == 0)
739
            {
740 2
                $message->setTimestamp($this->getTime());
741
            }
742
        }
743
744 5
        return $message;
745
    }
746
747
    /**
748
     *
749
     */
750 84
    private function registerPeriodicTimers()
751
    {
752
        $this->reqsHelperTimer = $this->getLoop()->addPeriodicTimer(0.1, function() {
753
            $this->expireRequests();
754 84
        });
755 84
        $this->repsHelperTimer = $this->getLoop()->addPeriodicTimer(0.1, function() {
756
            $this->expireResponses();
757
            $unfinished = $this->unfinishedResponses();
758
759
            foreach ($unfinished as $response)
760
            {
761
                $protocol = new Protocol('', $response->getPid(), '', $response->getAlias(), '', '', $this->getTime());
762
                $response = new Response($this, $protocol, new TaskIncompleteException("Task unfinished."));
763
                $response->call();
764
            }
765 84
        });
766 84
    }
767
768
    /**
769
     *
770
     */
771 6
    private function unregisterPeriodicTimers()
772
    {
773 6
        if ($this->reqsHelperTimer !== null)
774
        {
775
            $this->reqsHelperTimer->cancel();
776
        }
777
778 6
        if ($this->repsHelperTimer !== null)
779
        {
780
            $this->repsHelperTimer->cancel();
781
        }
782 6
    }
783
784
    /**
785
     *
786
     */
787 84
    private function registerEvents()
788
    {
789 84
        $this->handlers = $this->model->copyEvents($this, [ 'start', 'stop', 'connect', 'disconnect' ]);
790 84
        $this->handlers[] = $this->model->on('recv', [ $this, 'handleReceive' ]);
791 84
    }
792
793
    /**
794
     *
795
     */
796 6
    private function unregisterEvents()
797
    {
798 6
        foreach ($this->handlers as $handler)
799
        {
800 6
            $handler->cancel();
801
        }
802 6
    }
803
}
804