Consumer::background()   A
last analyzed

Complexity

Conditions 5
Paths 1

Size

Total Lines 20
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 16
nc 1
nop 0
dl 0
loc 20
rs 9.4222
c 0
b 0
f 0
1
<?php
2
/**
3
 * NSQ consumer
4
 * User: moyo
5
 * Date: 15/11/2017
6
 * Time: 3:40 PM
7
 */
8
9
namespace Carno\NSQ;
10
11
use function Carno\Coroutine\async;
12
use Carno\Net\Endpoint;
13
use Carno\NSQ\Chips\Daemon;
14
use Carno\NSQ\Chips\EndpointsPersistent;
15
use Carno\NSQ\Chips\EndpointsRefresher;
16
use Carno\NSQ\Chips\INSLinker;
17
use Carno\NSQ\Chips\LookupAssigned;
18
use Carno\NSQ\Chips\N2Daemon;
19
use Carno\NSQ\Chips\TCAssigned;
20
use Carno\NSQ\Connector\Linker;
21
use Carno\NSQ\Contracts\Daemonize;
22
use Carno\NSQ\Types\Consuming;
23
use Carno\Promise\Promised;
24
25
class Consumer extends Daemon implements Daemonize
26
{
27
    use N2Daemon, LookupAssigned, TCAssigned, INSLinker;
28
    use EndpointsRefresher, EndpointsPersistent;
29
30
    /**
31
     * @var Consuming
32
     */
33
    private $consuming = null;
34
35
    /**
36
     * Consumer constructor.
37
     * @param Consuming $consuming
38
     */
39
    public function __construct(Consuming $consuming)
40
    {
41
        $this->consuming = $consuming;
42
    }
43
44
    /**
45
     * @return Promised
46
     */
47
    protected function background() : Promised
48
    {
49
        return async(function () {
50
            yield $this->syncedEndpoints(
51
                $this->getTopic(),
52
                function (int $action, Endpoint $endpoint) {
53
                    switch ($action) {
54
                        case Linker::ACT_JOIN:
55
                            yield $this->linking(
56
                                $endpoint,
57
                                $this->consuming->assigned($this->getTopic(), $this->getChannel())
58
                            )->connect();
59
                            break;
60
                        case Linker::ACT_LEAVE:
61
                            yield $this->linker($endpoint)->disconnect();
62
                            break;
63
                    }
64
                },
65
                $this->hasLookupd() ? $this->getLookupd() : null,
66
                $this->hasLookupd() ? [] : $this->staticEndpoints()
67
            );
68
        });
69
    }
70
}
71